In the previous segment, you learnt about window operations in Spark Streaming. Now, let’s apply that knowledge and implement window operations in a Spark Structured Streaming program.
Now, let’s look at some more details of the window functions.
Since we have to read from a file, we removed the portion of the code that was used to read from the socket. We have three fields in our data file, which we added according to our schema.
Example
mySchema = StructType().add("activity_type","string").add("activity_time","timestamp").add("activity_count","integer")Output
We created a window duration of 1 day, while our sliding duration was set to 1 hour.
Example
windowDF = lines.groupBy(window("activity_time","1 day","1 hour")).sum("activity_count").alias("Events_Sum").orderBy(asc("window"))Output
As we were using aggregations, we changed our output mode to Complete.
Example
query = windowDF.writeStream.outputMode("complete").format("console").start()Output
Below you will find the code used in the video above. Make sure you prepare the players. csv file in the required directory before you run the code.
Note:
For the data file used in this segment, you can create it yourself by using the following format and then inputting your own data into a JSON file. You just need to change the values of each entry, especially the activity_time. Also, the activity_type can be either CPU, Memory, IO or Network.
Example
{"activity_type":"CPU","activity_time":"2020-08-15T05:34:45","activity_count":5},
{"activity_type":"Memory","activity_time":"2020-08-15T05:34:46","activity_count":6},
{"activity_type":"IO","activity_time":"2020-08-15T05:54:46","activity_count":3},
{"activity_type":"Network","activity_time":"2020-08-15T06:14:42","activity_count":7}Output
Let’s move to the next segment, where you will learn about late-arriving data.