PySpark Examples #5: Discretized Streams (DStreams)

This is the fourth blog post which I share sample scripts of my presentation about “Apache Spark with Python“. Spark supports two different way for streaming: Discretized Streams (DStreams) and Structured Streaming. DStreams is the basic abstraction in Spark Streaming. It is a continuous sequence of RDDs representing stream of data. Structured Streaming is the newer way of streaming and it’s built on the Spark SQL engine. In next blog post, I’ll also share a sample script about Structured Streaming but today, I will demonstrate how we can use DStreams.

When you search example scripts about DStreams, you find sample codes that reads data from TCP sockets. So I decided to write a different one: My sample code will read from files located in a directory. The script will check the directory every second, and process the new CSV files it finds. Here’s the code:

Here are the step by step explanation of the code:
Line 1) Each Spark application needs a Spark Context object to access Spark APIs. So we start with importing SparkContext library.
Line 2) For DStreams, I import StreamingContext library.
Line 4) Then I create a Spark Context object (as “sc”) – If you will run this code in PySpark client, you should ignore importing SparkContext and creating sc object, because the SparkContext is already defined.
Line 5) I create a Streaming Context object. The second parameter indicated the interval (1 seconds) for processing streaming data.
Line 7) Using textFileStream, I set the source directory for streaming, and create a DStream object.
Line 8) This simple function parses the CSV file.
Line 10) This is the action command for the DStream object. pprint method writes the content.
Line 12) Starts the streaming process.
Line 14) Waits until the script is terminated manually.

On every second, the script will check “/tmp/stream” folder, if it finds a new file, it will process the file and write the output. For example, if we put a file which contains the following data to the folder:

The script will print:

pprint is a perfect function to debug your code, but you probably want to store the streaming data to an external target (such as a Database, HDFS location). DStream object’s foreachRDD method can be used for it. Here’s another code to save the streaming data to JSON files:

Here are the step by step explanation of the code:
Line 1) Each Spark application needs a Spark Context object to access Spark APIs. So we start with importing SparkContext library.
Line 2) Because I’ll use DataFrames, I also import SparkSession library.
Line 3) For DStreams, I import StreamingContext library.
Line 5,6) I create a Spark Context object (as “sc”) and a Spark Session object (based on Spark Context) – If you will run this code in PySpark client, you should ignore these lines.
Line 7) I create a Streaming Context object. The second parameter indicated the interval (1 seconds) for processing streaming data.
Line 9) Using textFileStream, I set the source directory for streaming, and create a DStream object.
Line 10) This simple function parses the CSV file.
Line 12) I define a function accepting an RDD as parameter.
Line 13) This function will be called every second – even if there’s no streaming data, so I check if the RDD is not empty
Line 14) Convert the RDD to a DataFrame with columns “name” and “score”.
Line 15) Write the data to points_json folder as JSON files.
Line 17) Assign saveresult function for processing streaming data
Line 19) Starts the streaming process.
Line 21) Waits until the script is terminated manually.

After storing all these data in JSON format, we can run a simple script to query data:

In my previous blog posts, I already explained similar code, so I’ll not explain it step by step. See you on next blog post about Spark structured streaming.

Please share
  • 1
  •  
  •  
  •  
  •  
  •  
  •  

Gokhan Atil is a database administrator who has hands-on experience with both RDBMS and noSQL databases, and strong background on software development. He is certified as Oracle Certified Professional (OCP) and is awarded as Oracle ACE (in 2011) and Oracle ACE Director (in 2016) for his continuous contributions to the Oracle users community.

1 Comment

  1. Rubavathy

    Hi, I have a problem in parsing csv file from streaming folder where csv file has , inside some of the fields, It makes parsing inaccurate. Can you pls suggest how to handle this?

Leave Comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.