IKH

Transformation Operations

In the previous segment, you were introduced to transformation operations on an RDD. A transformation operation works on an RDD and stores the result in a new RDD. RDDs are immutable, i.e., they cannot be changed. However, certain operations can be performed on their elements, and the result of those operations can be stored in a new RDD.

In this segment, you will go through various operations on RDDs and find out what each of these operations does. You will also do coding to gain a better understanding of each transformation operation. In the next video, let’s hear from Vishwa Mohan on transformation operations.

Note:

Please note that in this module, you may sometimes see that the kernel is mentioned as Python 2 instead of PySpark. This is because some of these videos are older and the Python 2 kernel had the PySpark libraries installed already. For the current configuration of EMR, you will need to use the PySpark kernel only. The SME might also mention EC2 instance instead of EMR instance which is supposed to be in our case(At the most basic level, EMR instances make use of EC2 instances with additional configurations).

In this video, you looked at various operations. Let’s first understand the lambda function given below.

You can define another function and make a call to that function in the map operator rather than using lambda.

Example

Python
def abc(x):

      return (x*2)

rdd1 = sc.parallelize([1,2,3,4,5])

rdd1.map(abc).collect()

Output

Now, look at each operator closely with a different example.

filter(): This operation is useful to filter out the contents of an RDD based on a condition. Let’s consider the following example.

We are using a text file(document) that contains the following: 

Example

Python
Apache Spark™ is a unified analytics engine for large-scale data processing. 


It is an open-source distributed-computing engine. 

Spark provides a productive environment for data analysis because of its lightning speed and support for various libraries.

Output

You can create the file using the vi editor on your EMR instance and then push it to the livy directory.

Loading the data into the RDD.

Example

Python
rdd1 = sc.textFile(document)

rdd1.collect()

Now, let’s take a look at the output of this operation, as given below.

Output

PowerShell
['Apache Spark is a unified analytics engine for large-scale data processing.',

 '',

 'It is an open-source distributed-computing engine.',

 'Spark provides a productive environment for data analysis because of its lightning speed and support for various libraries.']

Let’s use the filter operation to remove empty lines.

Example

Python
lines = rdd1.filter(lambda x:x!=" ")
lines.collect()

Let’s take a look at the output of this operation, as given below.

Output

PowerShell
['Apache Spark is a unified analytics engine for large-scale data processing.',

 'It is an open-source distributed-computing engine.',

 'Spark provides a productive environment for data analysis because of its lightning speed and support for various libraries.']

You can see four elements in ‘rdd1’, where each element stores one line of the document string. You have also filtered out all the elements that are empty. This way, you are left with only those lines that contain at least one character.

map(): When applied to an RDD, this method will return a new RDD based on the operation performed on that RDD. Let’s consider the following example.

Example

Python
#You have already built lines RDD in the filter example.

words = lines.map(lambda x:x.split(" "))

words.collect()

Let’s take a look at the following output of this code.

Output

PowerShell
[['Apache',

  'Spark',

  'is',

  'a',

  'unified',

  'analytics',

  'engine',

  'for',

  'large-scale',

  'data',

  'processing.'],

 ['It', 'is', 'an', 'open-source', 'distributed-computing', 'engine.'],

 ['Spark',
 
'provides',

  'a',

  'productive',

  'environment',

  'for',

  'data',

  'analysis',

  'because',

  'of',

  'its',

  'lightning',

  'speed',

  'and',

  'support',

  'for',

  'various',

  'libraries.']]

On each element of an RDD, you applied an operation ‘split(” “)’. This operation split this line on every occurrence of the ” “. The number of elements in the RDD after the map() operation will always remain the same. Due to this, you now have an array of words as one element. Consider this second element of the RDD.

Example

Python
["It", "is", "an", "open-source", "distributed-computing", "engine."]

Output

This is one element of the rdd ‘words’, which is an array of all the words separated after the map() operation.

flatMap(): This is another operation similar to map(), but the number of elements in the output can be different from the number of elements in the input. Let’s consider the following example.

Example

Python
#You have already built lines RDD in the filter example.

words = lines.flatMap(lambda x:x.split(" "))

words.collect()

Let’s take a look at the output of this operation, as given below.

Output

PowerShell
['Apache',

 'Spark',

 'is',

 'a',

 'unified',

 'analytics',

 'engine',

 'for',

 'large-scale',

 'data',

 'processing.',

 'It',

 'is',

 'an',

 'open-source',

 'distributed-computing',

 'engine.',

 'Spark',

 'provides',

 'a',

 'productive',

 'environment',

 'for',

 'data',

 'analysis',

 'because',

 'of',

 'its',

 'lightning',

 'speed',

 'and',

 'support',

 'for',

 'various',

 'libraries.']

In this RDD, every word is now a different element.

distinct():

This function will identify the unique elements in an RDD and put them in a new RDD. Let’s consider the following example.

Suppose you have to find the total number of distinct words in a document.

Example

Python
words = lines.flatMap(lambda x:x.split(" "))

words.distinct().count()

Let’s take a look at the output of this code, as given below.

Output

PowerShell
29

sorted():

This method is not used to perform an operation on an RDD but to sort the elements in a list. Let’s consider the following example:

Example

Python
rdd1 = sc.parallelize([1,5,1,3,2,3,5])

sorted(rdd1.distinct().collect())

Let’s take a look at the output of this operation, given in the following image:

Output

As you can see in this image, you applied the sorted() method on the result of the collect() operation. collect() returns a list of all elements of the RDD. sorted() is applied on this list.

You have understood some basic operations on a basic RDD. In the next video, let’s take a look at some operations performed on two RDDs.

Let’s take a closer look at each operation.

union(): This operation will work on two RDDs and will result in an output that contains all the elements present in both the RDDs.

“rdd1.union(rdd2)”

intersection(): This operation will work on two RDDs and will result in an output that contains only those elements that are present in both the RDDs.

“rdd1.intersection(rdd2)”

subtract(): This operation will work on two RDDs and will result in an output that contains all the elements present in rdd1 but not those present in rdd2.

“rdd1.subtract(rdd2)”

cartesian(): This operation will work on two RDDs and will result in an output that contains pairs of each element of rdd1 with each element of rdd2.

“rdd1.cartesian(rdd2)”

Let’s take a look at the following example for cartesian().

document1 = [1,2,3]

document2 = [4,5,6]

sc.parallelize(document1).cartesian(sc.parallelize(document2)).collect()

This code will result in the following output.

(1,4), (1,5), (1,6), (2,4), (2,5), (2,6), (3,4), (3,5), (3,6)

In this segment, you learnt about various operators that are used to manipulate data stored in an RDD. In the next segments, you will look at various action operations on RDDs.

Additional Content

Transformation in RDD: Link to a list of some common transformations in the official Spark RDD programming documentation.

Report an error