How to Use AWS S3 bucket for Spark History Server

Since EMR Version 5.25, it’s possible to debug and monitor your Apache Spark jobs by logging directly into the off-cluster, persistent, Apache Spark History Server using the EMR Console. You do not need to anything extra to enable it, and you can access the Spark history even after the cluster is terminated. The logs are available for active clusters and are retained for 30 days after the cluster is terminated.

Although this is a great feature, each EMR cluster has its own logs in a different bucket, the number of active Spark history server UIs cannot exceed 50 for each AWS account, and if you want to keep the logs more than 30 days (after the cluster is terminated), you need to copy them to another bucket and then create a Spark History server for them.

To overcome all these limitations, and having a more flexible way to access Spark history, you can configure Spark to send the logs to a S3 bucket.

Query a HBASE table through Hive using PySpark on EMR

In this blog post, I’ll demonstrate how we can access a HBASE table through Hive from a PySpark script/job on an AWS EMR cluster. First I created an EMR cluster (EMR 5.27.0, Hive 2.3.5, Hbase 1.4.0). Then I connected to the master node, executed “hbase shell”, created a HBASE table, and inserted a sample row:

I logged in to hive and created a Hive table which points to the HBASE table:

When I tried to access table using spark.table(‘myhivetable’), I got an error pointing that the org.apache.hadoop.hive.hbase.HBaseStorageHandler class was not found. I tried to use “–packages” parameter to get the required JAR library from maven repository. It downloaded a lot of missing jars but it did not work. So I downloaded the required JAR file using wget, and copied it to Spark’s JAR directory:

PySpark Examples #5: Discretized Streams (DStreams)

This is the fourth blog post which I share sample scripts of my presentation about “Apache Spark with Python“. Spark supports two different way for streaming: Discretized Streams (DStreams) and Structured Streaming. DStreams is the basic abstraction in Spark Streaming. It is a continuous sequence of RDDs representing stream of data. Structured Streaming is the newer way of streaming and it’s built on the Spark SQL engine. In next blog post, I’ll also share a sample script about Structured Streaming but today, I will demonstrate how we can use DStreams.

When you search example scripts about DStreams, you find sample codes that reads data from TCP sockets. So I decided to write a different one: My sample code will read from files located in a directory. The script will check the directory every second, and process the new CSV files it finds. Here’s the code:

PySpark Examples #3-4: Spark SQL Module

In this blog post, I’ll share example #3 and #4 from my presentation to demonstrate capabilities of Spark SQL Module. As I already explained in my previous blog posts, Spark SQL Module provides DataFrames (and DataSets – but Python doesn’t support DataSets because it’s a dynamically typed language) to work with structured data.

First, let’s start creating a temporary table from a CSV file and run query on it. Like I did my previous blog posts, I use the “u.user” file file of MovieLens 100K Data (I save it as users.csv).

PySpark Examples #2: Grouping Data from CSV File (Using DataFrames)

I continue to share example codes related with my “Spark with Python” presentation. In my last blog post, I showed how we use RDDs (the core data structures of Spark). This time, I will use DataFrames instead of RDDs. DataFrames are distributed collection of data organized into named columns (in a structured way). They are similar to tables in relational databases. They also provide a domain specific language API to manipulate your distributed data, so it’s easier to use.

DataFrames are provided by Spark SQL module, and they are used as primarily API for Spark’s Machine Learning lib and structured streaming modules. Spark developers recommend to use DataFrames instead of RDDs, because the Catalyst (Spark Optimizer) will optimize your execution plan and generate better code to process the data.