In my previous post, I mentioned that Oracle Big Data Cloud Service – Compute Edition started to come with Zeppelin 0.7 and the version 0.7 does not have HIVE interpreter. It means we won’t be able to use “%hive” blocks to run queries for Apache Hive. Instead of “%hive” blocks, we can use JDBC interpreter (“%jdbc” blocks) or Spark SQL (“%sql” blocks).
The JDBC interpreter lets you create a JDBC connection to any data source. It has been tested with both popular RDBMS and NoSQL databases such as Postgres, MySQL, Amazon Redshift, Apache Hive. To be able to connect a data source, we first need to define it on Zeppelin interpreter settings. In normal conditions, we access Zeppelin trough Big Data Cloud – Compute Edition Console, and it prevents us to see the menu to reach the interpreter settings but we can easily bypass the console with a little trick. After we opened a notebook at the console, get the URL we connected, remove “?#notebook/XXXXX” part from the URL, and add “/zeppelinui/”, so our URL should be like this “https://bigdataconsoleip:1080/zeppelinui/”. This is the address we can access Zeppelin’s native user interface.
In this page, we can use the drop-down menu on the upper-right to access the interpreters page. We can search the interpreters, edit the settings and then restart the interpreter. For now, we don’t need to change anything. Hive is already defined in our Cloud Service so we can use JDBC interpreter to connect Hive.
To test the new Zeppelin, let’s create a new notebook, and then start working with Hive. I’ll use movielens databases (like I did before). I assume that you have already read my previous blog post about Zeppelin, or at least familiar with Zeppelin notebook. Anyway, here’s the shell script to download and extract the data files:
1 2 3 4 5 6 7 8 9 10 11 |
%sh cd /tmp/ rm ml-latest.zip rm -rf /tmp/ml-latest wget http://files.grouplens.org/datasets/movielens/ml-latest.zip unzip ml-latest.zip |
To run Hive queries with JDBC, we should add “%jdbc(hive)” as the first line of the block. JDBC interpreter supports multiple queries in same block so we can write all queries and run them together. We should use semicolon to separate queries like we do in other clients. The following block will create temporary tables, load data from CSV files and then create the main tables in ORC format:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
%jdbc(hive) DROP TABLE IF EXISTS ratings_tmp; CREATE TABLE ratings_tmp ( userId STRING, movieId STRING, rating STRING, rating_timestamp STRING ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' TBLPROPERTIES("skip.header.line.count"="1"); DROP TABLE IF EXISTS movies_tmp; CREATE TABLE movies_tmp ( movieId STRING, title STRING, genres STRING ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' TBLPROPERTIES("skip.header.line.count"="1"); LOAD DATA LOCAL INPATH '/tmp/ml-latest/ratings.csv' OVERWRITE INTO TABLE ratings_tmp; LOAD DATA LOCAL INPATH '/tmp/ml-latest/movies.csv' OVERWRITE INTO TABLE movies_tmp; CREATE TABLE movies STORED AS ORC AS SELECT CAST(movieid AS BIGINT) movieid, CASE WHEN title LIKE '%, The %' THEN CONCAT( 'The ', regexp_replace( title, ', The ', ' ' )) ELSE title END title, genres FROM movies_tmp; CREATE TABLE ratings STORED AS ORC AS SELECT userid, CAST(movieid AS BIGINT) movieid, CAST(rating AS DECIMAL) rating, CAST(rating_timestamp AS BIGINT) rating_timestamp FROM ratings_tmp; |
You may find the explanation of above queries in my previous blog post.
The names of the tables are self-explonary. Using these tables I can run the following block to get the highest rated movies and their genres (which has at least 20.000 votes). This time I’ll use Spark SQL (“%sql”) block instead of “%hive(jdbc)”. The query I used:
1 2 3 4 5 6 7 8 9 10 |
%sql SELECT m.title, AVG(r.rating) avg_rating, COUNT(*) total_votes, m.genres FROM movies AS m, ratings AS r WHERE m.movieid = r.movieid GROUP BY m.movieid, m.title, m.genres HAVING COUNT(*) >= 20000 ORDER BY avg_rating DESC LIMIT 10 |
There’s no lookup table for genres. Let’s play with Spark – of course I’ll use PySpark (The Spark Python API) and generate a lookup table then find the top movies for each genre. Here’s the PySpark code to find the genres:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
%pyspark genres = [] result_list = spark.sql("SELECT genres FROM movies").rdd.collect() for row in result_list: parsed = row["genres"].split('|') genres.extend( parsed ) result = set(genres) print result |
This is very similar to a word count script. Let me explain it line by line:
Line 3) Define an empty list to store genres
Line 5) Query movies table and stored to result to result_list
Line 7) A loop for each item in result_list
Line 8) Parse movie genres (ie: ‘Comedy|Adventure’ -> ‘Comedy’,’Adventure’)
Line 9) Add the parsed list to our genres list.
Line 11) Make the list unique (What a simple way to do it!)
Line 13) Print the result
You can run it and see the result. I could add one more command to turn the result to a temporary table but I thought that I could write it more spark-ish way:
1 2 3 4 5 6 7 8 |
%pyspark def parseGenres(row): return row["genres"].split('|') result = set(spark.sql("SELECT genres FROM movies").rdd.flatMap(parseGenres).collect()) spark.createDataFrame( [Row(i) for i in result], ["genres"] ).createOrReplaceTempView("genres") |
Let me explain this block line by line:
Line 3) Define a function to be applied to all rows
Line 4) Splits the words
Line 6) Query movies table, apply the parsing function, make the result unique
Line 8) Using the items of result list, generate a data frame and register it as “genres” table
Now we can run the following query to find the top movies for each genre (we’ll be able to use genres like a regular table in our query):
1 2 3 4 5 6 7 8 9 10 11 12 |
%sql SELECT genres, title, avg_rating, total_votes FROM ( SELECT g.genres, m.title, AVG(r.rating) avg_rating, COUNT(*) total_votes, ROW_NUMBER() over ( partition by g.genres order by AVG(r.rating) desc) AS rank FROM movies AS m, ratings AS r, genres g WHERE m.movieid = r.movieid and INSTR( m.genres, g.genres) >= 1 GROUP BY g.genres, m.movieid, m.title HAVING COUNT(*) >= 10000) WHERE rank = 1 ORDER BY genres |
Here’s the result. In previous version of Zeppelin (of BDCSCE), there was a bug about rounding numbers in data grids. As you can see, it’s fixed in this version of Zeppelin.
chatchai