You have already learnt what RDDs are and how they optimise data storage through partitioning of data in various executors. In this segment, you will learn how to create RDDs in PySpark. But before that, we will discuss a crucial topic related to SparkSession and SparkContext. In the following video, our expert Kautak will discuss this topic in detail.
Note:
Please note that in this module, you may sometimes see that the kernel is mentioned as Python 2 instead of PySpark. This is because some of these videos are older and the Python 2 kernel had the PySpark libraries installed already. For the current configuration of EMR, you will need to use the PySpark kernel only. The SME might also mention EC2 instance instead of EMR instance which is supposed to be in our case(At the most basic level, EMR instances make use of EC2 instances with additional configurations).
Following is the notebook used to explain RDD Operations.
Before Spark 2, a different context was required to plug in each functionality of Spark.
- To access RDDs, broadcast variables, accumulators, parallelise methods and others, you used SparkContext.
- To access hive functionalities, you used HiveContext.
- To access SQL functionalities, you used SQLContext.
However, with the introduction of SparkSession in Spark 2, all these functionalities came under the same concept. You can access all the Spark APIs and features using Spark Session.
Let’s take a look at the following code, which is used to declare a SparkContext object.
Note:
Here we are using SparkSession object to create SparkContext.
Example
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()
sc = spark.sparkContext
Output
Note:
Please note that for EMR notebooks, the Spark Context is declared when the PySpark kernel is first triggered in the notebook and you don’t need to create it separately like in the code snippet above. The concepts mentioned above are applicable if you are running Spark on your local machine or on a separate EC2 instance without EMR notebooks.
You can establish a connection between a job and the Spark environment by initialising SparkContext. Using SparkConf, you can configure SparkContext to select the cluster mode and also provide a unique name to the Spark application that runs your driver program.
The code for creating SparkSession is provided in the following code.
Example
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()spark
Output
In this session, the SparkContext object will be used to access the RDD API of Spark.
Let’s consider the following code and find out the purpose of each function.
Example
spark = SparkSession \
.builder \
.appName('Creating RDD') \
.master('local') \
.getOrCreate()
Output
Method description:
SparkSession.builder: It is a class in spark.sql.
.appName(‘Creating RDD’): It sets a name for the application, which will be shown in the Spark web UI.
.master(‘local’): It sets the Spark master URL, such as “local” to run locally, “local[4]” to run locally with four cores and “spark://master:7077” to run on a Spark standalone cluster.
.getOrCreate(): It gets an existing SparkSession or if there is not any, it creates a new one based on the options set in this builder.
You will also learn about Spark UI in the following modules.
Pro Tip:
- ‘\’ at the end of each line means that the code continues in the next line.
- If you download Spark and use Spark shell, ‘sc’ for SparkContext and ‘spark’ for SparkSession are already available and can be used directly.
Now that you have understood the initial setup required for working with Spark, let’s watch the next video where Vishwa Mohan will discuss the methods through which an RDD can be created.
To summarise, one way to create an RDD is to initialise it with the values directly using the parallelize method, which is called on the ‘sc‘ object. Let’s take a look at the syntax below:
Example
rdd1 = sc.parallelize(["Rahul","Shikhar","Rohit","Sourav","Irfan"])
Output
Since RDDs are stored in different partitions in various executors, you can use the following operation to check the number of partitions for the RDD that you just made.
Example
rdd1.getNumPartitions()
Output
This value will return a number, which indicates the default partitioning done by Spark Core according to the size of ‘rdd1‘.
While creating an RDD, you can use the following code to set the number of partitions.
Example
rdd1 = sc.parallelize(["Rahul","Shikhar","Rohit","Sourav","Irfan"], 3)
Output
Now, you can check the number of partitions using the getNumPartitions() method on ‘rdd1‘.
Finally, to display the elements of rdd1, you can use the collect() method on ‘rdd1‘.
Example
rdd1.collect()
Output
Pro Tip:
The collect() function sends all the values of an RDD back to the driver node. If the RDD is vast, then this function can result in an ‘Out of Memory’ error.
In the next video, we will discuss how to save an RDD.
To summarise, the elements in each partition of an RDD are stored as a different file in the folder name provided in the saveAsTextFile() method. There are certain points to consider while using this method, which are as follows:
- The name provided in the function must be a folder. Since many files will be created to save an RDD, all those files will be stored in this folder.
- The folder will be created in HDFS and can be accessed from the hadoop user using this command: hadoop fs -ls /user/livy.
- You can also pass an HDFS location to store the files in a specified location.
Example
saveAsTextFile('/user/livy/output01')
Output
Note
In the above case the ‘output01‘ folder(the last folder mentioned in the path) will be created by Spark in the HDFS and should not already exist.
You have learnt how to create an RDD using the parallelize method on SparkContext. Now, you will learn how to create an RDD from a text file. Let’s watch the next video to learn the same.
As illustrated by Kautak in the video above, you can use the textFile() method to create an RDD from a text file. This method loads the file and puts each line in the file as an element in the RDD.
Suppose you have a file that contains the following text.
“Apache Spark™ is a unified analytics engine for large-scale data processing.
It is an open-source distributed-computing engine.
Spark provides a productive environment for data analysis because of its lightning speed and support for various libraries.”
This textFile has four lines, and each line becomes one element of an RDD. Lines that do not contain any character are also considered as elements of an RDD.
Pro Tip:
The file that you want to read from must be present in HDFS. Use WinSCP to put any document into the hadoop user root directory and then use the following command to put the text file from the root directory to HDFS:
Example
hadoop fs -put "filename" /user/livy/<folder name if any>
Output
The two ways to create RDDs are summarised below:
- parallelize(): This method is used when data is present in the Spark driver program.
- textFile(): This method is used when data has to be loaded from external file storage systems.
Due to their large sizes, data sets are mostly present in external storage systems instead of in-memory. Hence, the textFile( ) method is used more compared to the parallelize( ) method.
Another important point to note here is that you have created a SparkContext from a SparkSession object.
Example
sc= spark.SparkContext
Output
You could have created an RDD using the following command.
Example
rdd1 = spark.SparkContext.parallelize(["This", "is", "pyspark", "module"])
Output
If you run ‘rdd1.collect()’, you will get the following output:
Example
["This","is","pyspark","module"]
Output
Pro Tip:
Create an RDD and use type() command to verify that the type of data structure is the RDD. You can use this command to know the type of any data structure used in the PySpark program.
Example
type(rdd1)
Output
In this segment, you learnt about the creation of RDDs. In the next segments, you will understand various operations on RDDs.
Additional Reading:
- Spark official RDD programming Guide:This is the official Spark 2.3.0 documentation for RDD programming.