How to Use AWS S3 bucket for Spark History Server

Since EMR Version 5.25, it’s possible to debug and monitor your Apache Spark jobs by logging directly into the off-cluster, persistent, Apache Spark History Server using the EMR Console. You do not need to anything extra to enable it, and you can access the Spark history even after the cluster is terminated. The logs are available for active clusters and are retained for 30 days after the cluster is terminated.

Although this is a great feature, each EMR cluster has its own logs in a different bucket, the number of active Spark history server UIs cannot exceed 50 for each AWS account, and if you want to keep the logs more than 30 days (after the cluster is terminated), you need to copy them to another bucket and then create a Spark History server for them.

To overcome all these limitations, and having a more flexible way to access Spark history, you can configure Spark to send the logs to a S3 bucket.

Lambda Function to Resize EBS Volumes of EMR Nodes

I have to start by saying that you should not use EMR as a persistent Hadoop cluster. The power of EMR lies in its elasticity. You should launch an EMR cluster, process the data, write the data to S3 buckets, and terminate the cluster. However, we see lot of AWS customers use the EMR as a persistent cluster. So I was not surprised when a customer told that they need to resize EBS volume automatically on new core nodes of their EMR cluster. The core nodes are configured to have 200 GB disks, but now they want to have 400 GB disks. It’s not possible to change the instance type or EBS volume configuration of core nodes, so a custom solution was needed for it. I explained to the customer, how to do it with some sample Python code, but at the end they gave up to use this method (thanks God).

I wanted to see if it can done anyway. So for fun and curiosity, I wrote a Lambda function with Java. It should be scheduled to run on every 5 or 10 minutes. On every run, it checks if there’s an ongoing resizing operation. If the resizing is done, it connects to the node and run “growpart” and “xfs_growfs” commands to grow the partition and filesystem. If there’s no resizing operation in progress, it checks all volumes of a specific cluster, and start a resizing operation on a volume which is smaller than a specific size.

Here’s the main class which will be used by Lambda function:

Query a HBASE table through Hive using PySpark on EMR

In this blog post, I’ll demonstrate how we can access a HBASE table through Hive from a PySpark script/job on an AWS EMR cluster. First I created an EMR cluster (EMR 5.27.0, Hive 2.3.5, Hbase 1.4.0). Then I connected to the master node, executed “hbase shell”, created a HBASE table, and inserted a sample row:

I logged in to hive and created a Hive table which points to the HBASE table:

When I tried to access table using spark.table(‘myhivetable’), I got an error pointing that the org.apache.hadoop.hive.hbase.HBaseStorageHandler class was not found. I tried to use “–packages” parameter to get the required JAR library from maven repository. It downloaded a lot of missing jars but it did not work. So I downloaded the required JAR file using wget, and copied it to Spark’s JAR directory:

Amazon QLDB and the Missing Command Line Client

Amazon Quantum Ledger Database is is a fully managed ledger database which tracks all changes of user data and maintains a verifiable history of changes over time. It was announced at AWS re:Invent 2018 and now available in five AWS regions: US East (N. Virginia), US East (Ohio), US West (Oregon), Europe (Ireland), and Asia Pacific (Tokyo).

You may ask why you would like to use QLDB (a ledger database) instead of using your traditional database solution. We all know that it’s possible to create history tables for our fact tables, and keep them up to date using triggers, stored procedures, or even with our application code (by writing changes of the main table to its history table). You can also say that your database have write-ahead/redo logs, so it’s possible to track and verify the changes of all your data as long as you keep them in your archive. On the other hand, It’s clear that this will create an extra workload and complexity for the database administrator and the application developer while it does not guarantee that the data was intact and reliable. What if your DBA directly modifies the data and history table after disabling the triggers and even alter the archived logs? You may say it’s too hard, but you know that it’s technically possible. In a legal dispute, or a security compliance investigation, this might be enough to question to the integrity of the data.

QLDB solves this problem with cryptographically verifiable journal. When an application needs to modify data in a document, the changes are logged into the journal files first (WAL concept). The difference here is, each block is hashed (SHA-256) for “verification” and has a sequence number to specify its address within the journal. QLDB calculates this hash value using the content of the journal block and the hash value of previous block. So the journal blocks are chained by the hash values! The QLDB users do not have access to the logs and the logs are immutable. In anyway, if someone modifies data, they also need to update the journal blocks related with the data. This will cause a new hash to be generated for the journal block, and all the following blocks will have a different hash value than before.

Sample AWS Lambda Function to Monitor Oracle Database

I wrote a very simple AWS Lambda function to demonstrate how to connect an Oracle database, gather the tablespace usage information, and send these metrics to CloudWatch. First, I wrote this lambda function in Python and then I had to re-write it in Java. As you may know, you need to use cx_oracle module to connect Oracle Databases with Python. This extension module requires some libraries which are shipped by Oracle Database Client (oh God!). It’s a little bit tricky to pack it for the AWS Lambda. Good thing is, I found a great document which explains all necessary steps.

Here’s the main class which will be used by Lambda function: