IKH

Implementing Windows

In the previous segment, you learnt about window operations in Spark Streaming. Now, let’s apply that knowledge and implement window operations in a Spark Structured Streaming program.

Now, let’s look at some more details of the window functions.

Since we have to read from a file, we removed the portion of the code that was used to read from the socket. We have three fields in our data file, which we added according to our schema.

Example

PowerShell
mySchema = StructType().add("activity_type","string").add("activity_time","timestamp").add("activity_count","integer")

Output

We created a window duration of 1 day, while our sliding duration was set to 1 hour.

Example

Python
windowDF = lines.groupBy(window("activity_time","1 day","1 hour")).sum("activity_count").alias("Events_Sum").orderBy(asc("window"))

Output

As we were using aggregations, we changed our output mode to Complete.

Example

Python
query = windowDF.writeStream.outputMode("complete").format("console").start()

Output

Below you will find the code used in the video above. Make sure you prepare the players. csv file in the required directory before you run the code.

Note:

For the data file used in this segment, you can create it yourself by using the following format and then inputting your own data into a JSON file. You just need to change the values of each entry, especially the activity_time. Also, the activity_type can be either CPU, Memory, IO or Network.

Example

Python
{"activity_type":"CPU","activity_time":"2020-08-15T05:34:45","activity_count":5},
{"activity_type":"Memory","activity_time":"2020-08-15T05:34:46","activity_count":6},
{"activity_type":"IO","activity_time":"2020-08-15T05:54:46","activity_count":3},
{"activity_type":"Network","activity_time":"2020-08-15T06:14:42","activity_count":7}

Output

Let’s move to the next segment, where you will learn about late-arriving data.

Report an error