IKH

Police Dataset Case Study

In this segment, we will go through a simple case study. This case study involves the data regarding incidents that are reported in various police stations across San Francisco. We will use what we have learnt in the previous segments to analyze this dataset and also learn various new operations that are useful for querying a dataframe.

To get the dataset used in this video, you can follow the instructions given below:

  • Open EMR instance on your machine.
  • Run the following command to download the dataset in your hadoop directory.

Example

Python
wget https://police-incidents-dataset-ds.s3.amazonaws.com/Police_Department_Incident_Reports__2018_to_Present.csv

Output

Once this is done, use the following command to put the dataset from the hadoop directory to HDFS.

Example

Python
hadoop fs -put Police_Department_Incident_Reports__2018_to_Present.csv /user/livy

Output

Now, Spark can directly access this dataset in HDFS.

The jupyter notebook used for analyzing police case study using dataframes and Spark SQL is:

Let us hear it from Vishwa and Kautak in this video where he introduces the dataset and performs some basic operations on this dataset using dataframe methods.

Note

Please note that in this module, you may sometimes see that the kernel is mentioned as Python 2 instead of PySpark. This is because some of these videos are older and the Python 2 kernel had the PySpark libraries installed already. For the current configuration of EMR, you will need to use the PySpark kernel only. The SME might also mention EC2 instance instead of EMR instance which is supposed to be in our case(At the most basic level, EMR instances make use of EC2 instances with additional configurations).

We have seen that StructType is very useful in applying your schema to the loaded dataset. ‘header’ is used to detect the column names in a dataframe. We can use operations such as df.printSchema() and df.show() to verify that the data is properly loaLet us understand some of the analytical queries used in this video:ded into correct data types.

Let us understand some of the analytical queries used in this video:

Example

Python
file1.select('Incident_id', 'Incident_Category').show(5)

Output

Code Description

This code shows information about Incident Id and Incident Category about 5 rows.

  • select(): It is used to select particular columns.
  • show(): It is used to show the rows in a dataframe. If now specified, it shows only 20 rows.

Example

Python
file1.select('Incident_Category').distinct().show(truncate = False)

Output

Code Description

  • This code shows Incident Category of 20 rows.
  • truncate: This attribute is used to specify whether the full Incident Category string should appear or not.

Example

Python
file1.select('Incident_Category').groupBy('Incident_Category').count().orderBy("count", ascending = False).show(52, False)

Output

Code Description:

This code shows all the 52 incident categories present in the dataset along with the number of incidents reported in each category.

  • groupBy(‘Incident_Category’): It groups the same incident categories so that some aggregation can be applied on the grouped data.
  • groupBy(‘Incident_Category’).count(): Once we grouped the data according to the incident category column, we count the number of rows in each group. The result will give us the number of incidents reported in each incident category. 
  • groupBy(‘Incident_Category’).count().orderBy(“count”, ascending = False): count() creates a column that contains number of incidents reported in each category. We can, then, apply orderBy operation on this column. The count is ordered such that the incident category with the most number of incidents appear first.

Now, Vishwa Mohan is discussing the columns that store DateTime data in this video.

One of the very important features that have been introduced in the video is using the timestamp for fields that have DateTime as a string data type. For that purpose, we have used a method called withColumn(). Let us have a look at some of the methods and their basic syntax:

Renaming a column

Example

Python
# Rename the column

df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

Output

Dropping a column

Example

Python
# Remove the column

dfdrop=df.drop("ORIGIN_COUNTRY_NAME").columns

Output

Changing the column type

Example

Python
# Changing column types

df.withColumn("count2", col("count").cast("Int"))

Output

‘count2‘ is the new column that will be created in place of the count column but will have a new data type.

Now, let us look at the code used in the video:

Example

Python
pattern1 = yyyy/MM/dd hh:mm:ss aa

Output

Code Description

Here, we are storing the pattern of DateTime that is used. Presently, the incident DateTime is a string data type. aa: specifies am or pm.

Example

Python
file2 = file1.withColumn('Incident_DateTime', unix_timeStamp(file1[Incident_DateTime, pattern1]).cast('timestamp'))

Output

Code Description

  • file2: Since dataframes are immutable, we store the result in a new dataframe named ‘file2’.
  • unix_timestamp(): The first argument is the string value that has DateTime format and the second argument is the pattern of the DateTime column.

There are various other columns that have DateTime data. Let us hear it from Vishwa Mohan about those columns in this video.

Now that we have discussed on loading and running some basic queries on the dataset let us watch this video where Vishwa Mohan is running some more queries on this dataset.

Let us look at the queries used in this video:

Analysis 1

Find the days of the week on which maximum incidents have happened.

We already have a column named ‘day of week’ in our dataset. But let us use Incident_DateTime column for our query.

Example

Python
file2.select(dayofweek("Incident_DateTime")).show(7)

Output

Since the data type of Incident_DateTime column is timestamp, we can use the day of week() method on it. it will give us the day of the week as numbers for each incident.

But for our analysis, we need dayofweek as string. For this, we use the following code:

Example

Python
file2.select(date_format("Incident_DateTime",'E')).show(7)

Output

‘E’ here is used to get the string format day of the week from the column “Incident_DateTime”. To understand this date_format method, you can use the link in the additional readings.

So, we know how we can find dayofweek in string format from timestamp “Incident_DateTime”. Let us add a new column named “dayoftheweek” in our dataset. Since dataframes are immutable, we add this column to our ‘file2’ dataframe and store the result in a new dataframe ‘file3‘.

Example

Python
# Adding a new column in our dataframe,which add the day of the week in each record
file3 =file2.withColumn('dayOfTheWeek' ,date_format("Incident_DateTime",'E'))

Output

To run the analysis, we use the following query,

#Aggregating based on the day of the week — this will get us the day of the week, on which maximum incidents happened

file3.groupBy(‘dayOfTheWeek’).count().orderBy(‘count’, ascending = False).show()

Code Description

We groupBy on the dayOfTheWeek column and count the number of rows in each group. We, then, arrange the count column that is created by the query to show the day when most incidents happened.

Analysis 2

What percent of the incidents have been recorded online?

We are using file2 that contains DateTime columns in timestamp data type.

Let us first analyze this column “File_Online” using the following code.

Example

Python
file2.select("File_Online").show()

Output

As discussed in the video, this is a boolean data type column that contains ‘true’ for cases reported online and ‘null’ for cases reported offline. We can find the percent of incidents recorded online with this column as well. But let us see how we can update this column values.

Example

Python
file3 = file2.withColumn("File_Online" ,when(col("File_Online") == True , True).otherwise(False) )
file3.select('File_Online').show()

Output

Code Description

Using withColumn(), we have specified that if the value of “File_Online” is True, it should remain True, but in any other case, it should be ‘False’. Also, as dataframes are immutable, we have stored the result in a new dataframe ‘file3‘.

Now, let us start the analysis of incidents reported online. we first create a new dataframe from ‘file3’ that contains the “File_Online” column along with the count of incidents recorded online as well as offline. This new dataframe is stored in ‘file4’ using the following code.

Example

Python
file4 = file3.select("File_Online").groupBy('File_Online').count()

file4.show()

Output

Since we have to find the percent, we have to sum all the incidents reported. To do that, we use a function ‘over(Window.partitionBy())’.

from pyspark.sql.window import Window

Example

Python
file4.withColumn( 'colnew' ,col('count') / sum('count').over(Window.partitionBy())).show()

Output

Analysis 3

Find the number of incidents reported each year in our dataset.

Example

Python
# Group by the numbers of incidents reported based on each Year

incidents_reporter_per_year = file2.select(year('Incident_DateTime')).groupBy('year(Incident_DateTime)').count()

incidents_reporter_per_year.show()

Output

Some of the other operations that are available on dataframes include:

Union of two Dataframes

Example

Python
unionDF = df1.unionAll(df2)

Output

Filter based on the value

Example

Python
explodeDF.filter(explodeDF.firstName == "xiangrui").sort(explodeDF.lastName)



filterDF = explodeDF.filter((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))

Output

Treatment of null rows 

Example

Python
#Drop records with null value

dropNullDF = explodeDF.na.drop()

Output

Aggregate function

Example

Python
salarySumDF = explodeDF.agg({"salary" : "sum"})

Output

This segment was filled with code demos that included various queries on a police dataset. In the upcoming segments, we will understand how we can run the same queries using Spark SQL.

Report an error