Using Spark to Process Data From Cassandra for Analytics

After my presentation about Apache Cassandra, most people asked if they can run analytical queries on Cassandra, and how they can integrate Spark with Cassandra. So I decided to write a blog post to demonstrate how we can process data from Cassandra using Spark. In this blog post, I’ll show how I can build a testing environment on Oracle Cloud (Spark + Cassandra), load sample data to Cassandra, and query the data using Spark.

Let me first create an Oracle Big Data Cloud instance. Instead of installing Spark manually, I’ll use Big Data Cloud service so I’ll have both Spark and Zeppelin. Zeppelin is a web-based notebook for interactive data analytics. I’ll use Zeppelin to run Spark scripts and queries.

I login to Oracle Cloud, and start creating Big Data Cloud service. I select “Basic” for deployment profile, because I do not need HIVE, I want only one node (for testing), and I select 2.1 of Spark version. After the service is created, I go to “Access rules” and enable ora_p2bdcsce_ssh because I will need connect to my server through SSH.

I will also need a virtual machine to run Cassandra. For simplicity, I’ll use “Compute Classic” service and create only one node. I select Oracle Linux, and name it as “CAS1”.

When I was writing this blog post, there was a bug related with JDK 1.8, so I decided to install JDK 1.7 and Cassandra 2.2. I recommend you to install JDK 1.8 and Cassandra 3.

I connect with the opc user and switch to root, install JDK and wget, download and extract Cassandra file, create data and commitlog directories (for Cassandra). Then I create a new user for Cassandra, and give ownership of the directories I created to the new user.

I edit cassandra configuration file (/cassandra2/conf/cassandra.yaml) and modify cluster_name, data_file_directories, commitlog_directory, rpc_address, listen_address, seed_provider. Please check the new values of these settings:

You may wonder why I use 192.168.1.2, it’s default IP of eth0 on Compute Classic instances. I need my Cassandra instance listens the eth0 interface because the public IP is forwarded to eth0. So I give the IP address of eth0. You can check the IP addresses of your node by “ifconfig”. There is an interesting thing about rpc_address. Although rpc is disabled by default (and I do not enable it), I need to edit rpc_address because if I leave it as it is (it’s set to localhost by default), Cassandra only listens localhost (loopback adaptor). It looks like a bug to me.

After saving the configuration file, I start my cassandra instance. Everything looks OK. Now I need a sample data set. I found an interesting one from data.gov: incidents of crime in Chicago. I use wget to download the file as “crimes.csv”:

To be able to import this data, I need to create a keyspace and a table. So I connect to Cassandra using cqlsh:

and I run the following CQL queries to create the keyspace and the table:

Then I use COPY command of cqlsh to load CSV data to Cassandra:

It took 24 minutes to insert 6.5 million records (on a 1 core virtual machine with zero tuning).

My Cassandra server does not accept any connection from remote because of the Oracle Cloud Security settings. So I make the required changes on security rules to give permission to my Big Data Cloud server to connect my Cassandra server.

I go to Compute Classic Service Console and network page. In the network page, I define a new “IP Address Prefix Sets”, and enter IP address of my Big Data Cluster server (you can find its IP address on its service page). By the way, I need to enter the IP address in CIDR notation (IP + address bits). So let’s say I have only one server and its IP address is 130.15.15.15, in this case I enter it as 130.15.15.15/32.

I define a security protocol. It’s nothing more than telling which port your application uses. Cassandra uses port 9042.

On the last step, I create a new security rule to allow my Cassandra server to accept TCP connections trough port 9042 from my big data server. Do not forget to pick access control list of your instance, otherwise the security rule will be effective.

Now I have 2 servers; Spark and Zeppelin runs on one of them, and Cassandra runs on the other one. The last thing I need to do is, install Spark-Cassandra connector (provided by DataStax) to my Big Data Cloud server. Because I will use PySpark, I will skip downloading and compiling java codes, and directly fetch the required JAR file from Spark Packages website. I connect to my Big Data Cloud server using SSH, switch to “root” user and run this command:

I ignore error messages. All I need is to see a message something like this:

The spark-cassandra connector JAR is downloaded. We will use 2 JAR files (com.datastax.spark_spark-cassandra-connector_VERSION.jar and com.twitter_jsr166e-VERSION.jar). These jars are in the .ivy2/jars folder in your user’s home directory. If you check the directory, you can see there are more JARs. In my tests, I don’t need the rest of them but if you get an error about missing java classes when working with Cassandra, you may add relevant JAR files to spark interpreter settings.

I create a new directory, and copy all the JARs into the new directory:

I go to the Big Data Console, notebook settings page, find the spark interpreter and click edit button.

I add the path of the spark-cassandra connector JAR files (both com.datastax.spark_spark-cassandra-connector and com.twitter_jsr166e), click SAVE button and then OK button to update the settings. Now I can use Zeppelin and Spark to connect Cassandra database. I click to the Zeppelin notebook page and create a new notebook called “Spark and Cassandra Tests”.

I will access Cassandra tables by using org.apache.spark.sql.cassandra class. In new Zeppelin notebook, I write the following PySpark code:

Line 3) In this line, I use spark object to connect Cassandra cluster. I give the node address, the keyspace and the table name. Load function returns a Spark DataFrame. I do not need all columns so I do projection and pick only “primary_type”, “date”, “year”, “description”, “location_description” and fields. I filter only rows belong to year 2016 and 2017. The cache() function will store the data in cache for repeating queries. This line of code will not fetch any data until we call an action method such as count or collect.
Line 5) I called count method so Spark will actually read data from the Cassandra table.
Line 7) I register the resulting dataframe as temporary table so I can query it with Spark SQL commands.

After running the above script, I can query the crimes table. Let’s find the most common crimes:

As I can see, in year 2017, the most common crime is “theft”. Now I will check the subtypes (descriptions) of crime:

This time, instead of getting results in datagrid, I selected the pie chart. Now let’s compare the number of thefts in 2016 and 2017.

I used line graph to see the difference and trend in theft incidents. After analyzing data, it’s possible to write the results to Cassandra:

When I query the summary table from Cassandra, I see that it’s populated:

That’s all for now. Hope this blog post helps you to understand how you can use Spark to process data from Cassandra.

Please share
  • 8
  •  
  •  
  •  
  •  
  •  
  •  

Gokhan Atil is a database administrator who has hands-on experience with both RDBMS and noSQL databases, and strong background on software development. He is certified as Oracle Certified Professional (OCP) and is awarded as Oracle ACE (in 2011) and Oracle ACE Director (in 2016) for his continuous contributions to the Oracle users community.

Leave Comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.