In this segment, we will go through a simple case study. This case study involves the data regarding incidents that are reported in various police stations across San Francisco. We will use what we have learnt in the previous segments to analyze this dataset and also learn various new operations that are useful for querying a dataframe.
To get the dataset used in this video, you can follow the instructions given below:
- Open EMR instance on your machine.
- Run the following command to download the dataset in your hadoop directory.
Example
wget https://police-incidents-dataset-ds.s3.amazonaws.com/Police_Department_Incident_Reports__2018_to_Present.csvOutput
Once this is done, use the following command to put the dataset from the hadoop directory to HDFS.
Example
hadoop fs -put Police_Department_Incident_Reports__2018_to_Present.csv /user/livyOutput
Now, Spark can directly access this dataset in HDFS.
The jupyter notebook used for analyzing police case study using dataframes and Spark SQL is:
Let us hear it from Vishwa and Kautak in this video where he introduces the dataset and performs some basic operations on this dataset using dataframe methods.
Note
Please note that in this module, you may sometimes see that the kernel is mentioned as Python 2 instead of PySpark. This is because some of these videos are older and the Python 2 kernel had the PySpark libraries installed already. For the current configuration of EMR, you will need to use the PySpark kernel only. The SME might also mention EC2 instance instead of EMR instance which is supposed to be in our case(At the most basic level, EMR instances make use of EC2 instances with additional configurations).
We have seen that StructType is very useful in applying your schema to the loaded dataset. ‘header’ is used to detect the column names in a dataframe. We can use operations such as df.printSchema() and df.show() to verify that the data is properly loaLet us understand some of the analytical queries used in this video:ded into correct data types.
Let us understand some of the analytical queries used in this video:
Example
file1.select('Incident_id', 'Incident_Category').show(5)Output
Code Description
This code shows information about Incident Id and Incident Category about 5 rows.
- select(): It is used to select particular columns.
- show(): It is used to show the rows in a dataframe. If now specified, it shows only 20 rows.
Example
file1.select('Incident_Category').distinct().show(truncate = False)Output
Code Description
- This code shows Incident Category of 20 rows.
- truncate: This attribute is used to specify whether the full Incident Category string should appear or not.
Example
file1.select('Incident_Category').groupBy('Incident_Category').count().orderBy("count", ascending = False).show(52, False)Output
Code Description:
This code shows all the 52 incident categories present in the dataset along with the number of incidents reported in each category.
- groupBy(‘Incident_Category’): It groups the same incident categories so that some aggregation can be applied on the grouped data.
- groupBy(‘Incident_Category’).count(): Once we grouped the data according to the incident category column, we count the number of rows in each group. The result will give us the number of incidents reported in each incident category.
- groupBy(‘Incident_Category’).count().orderBy(“count”, ascending = False): count() creates a column that contains number of incidents reported in each category. We can, then, apply orderBy operation on this column. The count is ordered such that the incident category with the most number of incidents appear first.
Now, Vishwa Mohan is discussing the columns that store DateTime data in this video.
One of the very important features that have been introduced in the video is using the timestamp for fields that have DateTime as a string data type. For that purpose, we have used a method called withColumn(). Let us have a look at some of the methods and their basic syntax:
Renaming a column
Example
# Rename the column
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columnsOutput
Dropping a column
Example
# Remove the column
dfdrop=df.drop("ORIGIN_COUNTRY_NAME").columnsOutput
Changing the column type
Example
# Changing column types
df.withColumn("count2", col("count").cast("Int"))Output
‘count2‘ is the new column that will be created in place of the count column but will have a new data type.
Now, let us look at the code used in the video:
Example
pattern1 = yyyy/MM/dd hh:mm:ss aaOutput
Code Description
Here, we are storing the pattern of DateTime that is used. Presently, the incident DateTime is a string data type. aa: specifies am or pm.
Example
file2 = file1.withColumn('Incident_DateTime', unix_timeStamp(file1[Incident_DateTime, pattern1]).cast('timestamp'))Output
Code Description
- file2: Since dataframes are immutable, we store the result in a new dataframe named ‘file2’.
- unix_timestamp(): The first argument is the string value that has DateTime format and the second argument is the pattern of the DateTime column.
There are various other columns that have DateTime data. Let us hear it from Vishwa Mohan about those columns in this video.
Now that we have discussed on loading and running some basic queries on the dataset let us watch this video where Vishwa Mohan is running some more queries on this dataset.
Let us look at the queries used in this video:
Analysis 1
Find the days of the week on which maximum incidents have happened.
We already have a column named ‘day of week’ in our dataset. But let us use Incident_DateTime column for our query.
Example
file2.select(dayofweek("Incident_DateTime")).show(7)Output
Since the data type of Incident_DateTime column is timestamp, we can use the day of week() method on it. it will give us the day of the week as numbers for each incident.
But for our analysis, we need dayofweek as string. For this, we use the following code:
Example
file2.select(date_format("Incident_DateTime",'E')).show(7)Output
‘E’ here is used to get the string format day of the week from the column “Incident_DateTime”. To understand this date_format method, you can use the link in the additional readings.
So, we know how we can find dayofweek in string format from timestamp “Incident_DateTime”. Let us add a new column named “dayoftheweek” in our dataset. Since dataframes are immutable, we add this column to our ‘file2’ dataframe and store the result in a new dataframe ‘file3‘.
Example
# Adding a new column in our dataframe,which add the day of the week in each record
file3 =file2.withColumn('dayOfTheWeek' ,date_format("Incident_DateTime",'E'))Output
To run the analysis, we use the following query,
#Aggregating based on the day of the week — this will get us the day of the week, on which maximum incidents happened
file3.groupBy(‘dayOfTheWeek’).count().orderBy(‘count’, ascending = False).show()
Code Description
We groupBy on the dayOfTheWeek column and count the number of rows in each group. We, then, arrange the count column that is created by the query to show the day when most incidents happened.
Analysis 2
What percent of the incidents have been recorded online?
We are using file2 that contains DateTime columns in timestamp data type.
Let us first analyze this column “File_Online” using the following code.
Example
file2.select("File_Online").show()Output
As discussed in the video, this is a boolean data type column that contains ‘true’ for cases reported online and ‘null’ for cases reported offline. We can find the percent of incidents recorded online with this column as well. But let us see how we can update this column values.
Example
file3 = file2.withColumn("File_Online" ,when(col("File_Online") == True , True).otherwise(False) )
file3.select('File_Online').show()Output
Code Description
Using withColumn(), we have specified that if the value of “File_Online” is True, it should remain True, but in any other case, it should be ‘False’. Also, as dataframes are immutable, we have stored the result in a new dataframe ‘file3‘.
Now, let us start the analysis of incidents reported online. we first create a new dataframe from ‘file3’ that contains the “File_Online” column along with the count of incidents recorded online as well as offline. This new dataframe is stored in ‘file4’ using the following code.
Example
file4 = file3.select("File_Online").groupBy('File_Online').count()
file4.show()Output
Since we have to find the percent, we have to sum all the incidents reported. To do that, we use a function ‘over(Window.partitionBy())’.
from pyspark.sql.window import Window
Example
file4.withColumn( 'colnew' ,col('count') / sum('count').over(Window.partitionBy())).show()Output
Analysis 3
Find the number of incidents reported each year in our dataset.
Example
# Group by the numbers of incidents reported based on each Year
incidents_reporter_per_year = file2.select(year('Incident_DateTime')).groupBy('year(Incident_DateTime)').count()
incidents_reporter_per_year.show()Output
Some of the other operations that are available on dataframes include:
Union of two Dataframes
Example
unionDF = df1.unionAll(df2)Output
Filter based on the value
Example
explodeDF.filter(explodeDF.firstName == "xiangrui").sort(explodeDF.lastName)
filterDF = explodeDF.filter((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))Output
Treatment of null rows
Example
#Drop records with null value
dropNullDF = explodeDF.na.drop()Output
Aggregate function
Example
salarySumDF = explodeDF.agg({"salary" : "sum"})Output
This segment was filled with code demos that included various queries on a police dataset. In the upcoming segments, we will understand how we can run the same queries using Spark SQL.