Using Spark to join data from CSV and MySQL Table

Yesterday, I explained how we can access MySQL database from Zeppelin which comes with Oracle Big Data Cloud Service Compute Edition (BDCSCE). Although we can use Zeppelin to access MySQL, we still need something more powerful to combine data from two different sources (for example data from CSV file and RDBMS tables). Spark is a great choice to process data. In this blog post, I’ll write a simple PySpark (Python for Spark) code which will read from MySQL and CSV, join data and write the output to MySQL again. Please keep in mind that I use Oracle BDCSCE which supports Spark 2.1. So I tested my codes on only Spark 2.1 and used Zeppelin environment. I expect you run all these steps on same environment. Otherwise, you may need to modify paths and codes.

For my sample script, I’ll use the flight information belongs to year 2008. If you read my my blog post series about BDCSCE, you should be familiar with it. Anyway, do not worry about the data structure, I use only a few columns of the data, and you get more information about it on the statistical computing website.

First, I’ll create a table on MySQL to store most active carriers (in 2008). I already set Zeppelin to access my MySQL database, so I create a new paragraph, put the following SQL commands and run them.

By executing the above SQL codes, we create a table named “carriers” and insert 7 rows. I use the following code to download and unzip flight data from

If you get an error (Paragraph received a SIGTERM), increase the shell.command.timeout.millisecs of sh interpreter. If everything went OK, we should have the 2008.csv file in our hadoop system and carriers table in our MySQL server. Now it’s time to read data from MySQL carriers table to carriers dataframe:

Do not forget to modify the code according to your MySQL credentials and server address. Let me explain the code: First I create a variable for storing credentials (for code readability), then I use Spark’s sqlContext object to read from JDBC source. As you can see I give URL, table name and my credentials (as properties). I assign the output to “carriers” object and then call “show” method to fetch the data and show it on screen.

Here’s the next block:

In this block, I read flight information from CSV file (line 5), create a mapper function to parse the data (line 7-10), apply the mapper function and assign the output to a dataframe object (line 12), and join flight data with carriers data, group them to count flights by carrier code, then sort the output (line 14). Then I call show method to display the result (line 16). For my sample, I do not need to parse it all these columns but I just copied from one of old blog posts. You can surely write a simpler parsing function.

Now we can write the result dataframe to a MySQL table:

I use write method of dataframe to write the content of the dataframe to a table named “flights_carriers”. By the way, On the first run of this script, mode parameter is not required, because the flights_carriers table does not exist.

After running all these blocks, I checked my flights_carriers, and as you see the table contains the data calculated by Spark scripts. You can use the same method to combine data from different data sources such as JSON, Oracle RDBMS, Hive etc… See you next blog post!

Please share

AWS Big Data Specialist. Oracle Certified Professional (OCP) for EBS R12, Oracle 10g and 11g. Co-author of "Expert Oracle Enterprise Manager 12c" book published by Apress. Awarded as Oracle ACE (in 2011) and Oracle ACE Director (in 2016) for the continuous contributions to the Oracle users community. Founding member, and vice president of Turkish Oracle User Group (TROUG). Presented at various international conferences including Oracle Open World.

1 Comment

  1. Hitendra R Chavan

    Hi Gokhan,

    I am totally new to python and pyspark, but what i am expecting here is that i have to fetch data from a flat file and loading it into snowflake database
    below is my code but i am getting error on the last line of this code also would like to know whether with the last line of code will that data will get loaded into snowfake ?

    from pyspark import SparkConf, SparkContext

    from pyspark.sql import SQLContext

    from pyspark.sql.types import *

    from pyspark import SparkConf, SparkContext

    SNOWFLAKE_SOURCE_NAME = “net.snowflake.spark.snowflake”

    sfOptions = {

    sfURL: “”,

    sfUser: “hitenchavanaws”,

    sfPassword: “Th1s1s1t”,

    sfDatabase: “demo_db”,

    sfSchema: “public”,

    sfWarehouse: “compute_wh”,

    parallelism: “64”


    from pyspark.sql import SparkSession

    spark = SparkSession.builder.appName(“my_app”).config(‘spark.sql.codegen.wholeStage’, False).getOrCreate()\

    df =“delimiter”, “,”).csv(“test.csv”, header = False)

    df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option(“dbtable”, “t1”).mode(“append”).save()

    Thank you
    Hitendra R Chavan

Leave Comment

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.