The right way to use Spark and JDBC

A while ago I had to read data from a MySQL table, do a bit of manipulations on that data and store the results on the disk.
The obvious choice was to use Spark, I was already using it for other stuff and it seemed super easy to implement.

This is more or less what I had to do (I removed the part which does the manipulation for the sake of simplicity):

spark.read.format("jdbc").
option("url", "jdbc:mysql://dbhost/sbschhema").
option("dbtable", "mytable").
option("user", "myuser").
option("password", "mypassword").
load().write.parquet("/data/out")

Looks good, only it didn’t quite work. Either it was super slow or it totally crashed depends on the size of the table.
Tuning Spark and the cluster properties helped a bit, but it didn’t solve the problems.

Since I was using AWS EMR, it made sense to give Sqoop a try since it is a part of the applications supported on EMR.

sqoop import --verbose --connect jdbc:mysql://dbhost/sbschhema --username myuser --table opportunity --password  mypassword --m 20 --as-parquetfile --target-dir /data/out

Sqoop performed so much better almost instantly, all you needed to do is to set the number of mappers according to the size of the data and it was working perfectly.

Since both Spark and Sqoop are a based on Hadoop map-reduce framework, it was clear that Spark can work at least as good as Sqoop, I only needed to find out how to do it. I decided to look closer at what Sqoop does to see if I can imitate that with Spark.

By turning on the verbose flag of Sqoop, you can get a lot more details.
What I found was that Sqoop is splitting the input to the different mappers which makes sense, this is map-reduce after all, Spark does the same thing.
But before doing that, Sqoop does something smart that Spark doesn’t do.

It first fetches the primary key (unless you give him another key to split the data by), it then checks it’s minimum and maximum values. Then it lets each of its mappers to query the data but with different boundaries for the key, so that the rows are split evenly between the mappers.

If for example the key maximum value is 100, and there are 5 mappers, than the query of the 1st mapper will look like this:

SELECT * FROM mytable WHERE mykey >= 1 AND mykey <= 20;

and the query for the second mapper will be like this:

SELECT * FROM mytable WHERE mykey >= 21 AND mykey <= 40;

and so on..

This totally made sense. Spark was not working properly because it didn’t know how to split the data between the mappers.

So it was time to implement the same logic with Spark.
This means I had to do these actions on my code to make Spark work properly.
1. Fetch the primary key of the table
2. Find the key minimum and maximum values
3. Execute spark with those values

This is the code I ended up with:

def main(args: Array[String]){

	// parsing input parameters ...

	val primaryKey = executeQuery(url, user, password, s"SHOW KEYS FROM ${config("schema")}.${config("table")} WHERE Key_name = 'PRIMARY'").getString(5)
	val result = executeQuery(url, user, password, s"select min(${primaryKey}), max(${primaryKey}) from ${config("schema")}.${config("table")}")
    val min = result.getString(1).toInt
    val max = result.getString(2).toInt
    val numPartitions = (max - min) / 5000 + 1

	val spark = SparkSession.builder().appName("Spark reading jdbc").getOrCreate()

	var df = spark.read.format("jdbc").
		option("url", s"${url}${config("schema")}").
		option("driver", "com.mysql.jdbc.Driver").
		option("lowerBound", min).
		option("upperBound", max).
		option("numPartitions", numPartitions).
		option("partitionColumn", primaryKey).
		option("dbtable", config("table")).
		option("user", user).
		option("password", password).load()
		
	// some data manipulations here ...

	df.repartition(10).write.mode(SaveMode.Overwrite).parquet(outputPath)      

}

And it worked perfectly.

Remarks:
1. The numPartitions I set for Spark is just a value I found to give good results according to the number of rows. This can be changed, since the size of the data is also effected by the column size and data types of course.
2. The repartition action at the end is to avoid having small files.

Leave a Reply

Your email address will not be published.