IKH

Additional Resources

In this segment, you will learn about some of the concepts such as UDF, Watermark and File Sink in Spark Streaming and how to use them. These concepts will help you while working on the project.

UDF

UDF or User-defined Functions are functions that are created and modified by the users to fit their particular requirements. Let’s see how you can use UDF in the Spark Structured streaming application.

Let’s assume that you are working on a list of quotes by famous people. So, the basic code for reading a set of quotes may look like this:

Example

Python

# Initialize Spark session
spark = SparkSession \
   .builder \
   .appName("udfExample") \
   .getOrCreate()

# Name of the columns
columns = ["Seqno", "Person", "Quote"]

# The quotes
data = [("1", "Albert, Einstein", 
"Imagination is more important than knowledge."),
       ("2", "Walt Disney", 
"All our dreams can come true, if we have the courage to pursue them."),
       ("3", "Aristotle", 
"We are what we repeatedly do. Excellence, then, is not an act, but a habit."),
       ("4", "Audrey Hepburn", 
"Nothing is impossible. The word itself says 'I'm Possible'!"),
       ("5", "Henry Ford", 
"Whether you think you can, or you think you can't, you're probably right."),
       ("6", "Paulo Coelho", 
"When you want something, all the universe conspires in helping you to achieve it.")]

# Create and display the data frame
df = spark.createDataFrame(data=data,schema=columns)
df.show(truncate=False)

Output

At this point, the output will look something like that given below. Here, you can see three columns named ‘‘Seqmo”, ”Person” and Quote” as they are present in the dataset.

Now, you have a requirement of calculating the number of words inside each quote. You know a good way of doing that in Python, but pySpark library does not have any such standard and foolproof way of doing this calculation. In such cases, you will need a UDF to achieve that. Writing UDF in pySpark is generally done in two phases:

  • Phase 1: Writing the utility Python function, which contains the logic for the UDF
  • Phase 2: Converting the Python function to UDF

You can define the Python function as shown below.

Example

Python
# Utility method for calculating word count for a file
def get_word_count(text):
   return len(re.findall(r'\w+', text))

Output

Next, you can convert this Python to UDF as given below.

Example

Python
# Converting function to UDF
wordCountUDF = udf(lambda t: get_word_count(t), IntegerType())

# Alternatively, you can also use this syntax:
# wordCountUDF = udf(get_word_count, IntegerType())

Output

Now, it is time to use the UDF to add a new column named ”Word Count”.

Example

Python
df.withColumn("Word Count", wordCountUDF(df.Quote)) \
   .show(truncate=False)

Output

After you run the program, you can see that the new column ”Word Count” gets added to the dataset.

The complete program with all the import statements is provided below.

Example

Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re

# Utility method for calculating word count for a file
def get_word_count(text):
   return len(re.findall(r'\w+', text))

# Initialize Spark session
spark = SparkSession \
   .builder \
   .appName("udfExample") \
   .getOrCreate()

# Name of the columns
columns = ["Seqno", "Person", "Quote"]

# The quotes
data = [("1", "Albert, Einstein", 
"Imagination is more important than knowledge."),
       ("2", "Walt Disney", 
"All our dreams can come true, if we have the courage to pursue them."),
       ("3", "Aristotle", 
"We are what we repeatedly do. Excellence, then, is not an act, but a habit."),
       ("4", "Audrey Hepburn", 
"Nothing is impossible. The word itself says 'I'm Possible'!"),
       ("5", "Henry Ford", 
"Whether you think you can, or you think you can't, you're probably right."),
       ("6", "Paulo Coelho", 
"When you want something, all the universe conspires in helping you to achieve it.")]

df = spark.createDataFrame(data=data,schema=columns)

# Converting function to UDF
wordCountUDF = udf(get_word_count, IntegerType())

# Add new column using UDF
df.withColumn("Word Count", wordCountUDF(df.Quote)) \
   .show(truncate=False)

Output

Watermark

You have already learnt the concepts of Watermark in Spark streaming. Now, let’s see how we use it in the code.

Suppose you are working on a stream of network events where you receive request from various devices. You have been tasked with calculating the request count based on the device model and a sliding time window of 10 minutes interval, and the window slides every five minutes.

Now, as there is a huge load on the system, there can be plenty of cases where an event appears a little late. So, you can put a cap of 10 minutes stating that an event appears a little late. So, you can put a cap of 10 minutes stating that an event can be at most 10 minutes late. You can use ‘Watermark’ to handle this specific scenario.

Example

Output

File Sink

Once the calculation is done, you can write the data into various sinks. In this exercise, you will write the code into a file sink. Let’s see how to do that in code. For writing the data into file sink, you must specify the following:

  • Format: It can be either of json, csv or parquet.
  • Output Mode: The default mode is ‘append’. The other options are complete’ and ‘update’. For streaming use cases, the most widely used mode is ‘append’.
  • Path: The path to the file where the data will be written.
  • Checkpoint Location: The path to the checkpoint directory. This checkpoint location has to be a path in an HDFS compatible file system.

This corresponding code segment will be as follows:

Example

Python
aggregatedDataStream.writeStream \
   .format("csv") \
   .outputMode("append") \
   .option("truncate", "false") \
   .option("path", "path/to/destination/dir") \
   .option("checkpointLocation", "path/to/checkpoint/dir") \
   .trigger(processingTime="1 minute") \
   .start() \
   .awaitTermination()

Output

Report an error