Window Functions in SQL and PySpark ( Notebook)

Window Functions are something that you use almost every day at work if you are a data engineer. Window functions make life very easy at work. They help in solving some complex problems and help in performing complex operations easily.

let’s just dive into the Window Functions usage and operations that we can perform using them.

Note: Everything Below, I have implemented in Databricks Community Edition. This is not a written article; just pasting the notebook here. To Keep it as a reference for me going forward.

Databricks Notebook —

# File location and type
file_location = "/FileStore/tables/wage_data.csv"
file_type = "csv"
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \

Create a view or table from the Pyspark Dataframe

temp_table_name = "wage_data_csv"

Retrieving Data from the table

select * from `wage_data_csv`

permanent_table_name = "wage_data_csv"

There are three types of window functions:

  1. Aggregate — (AVG, MAX, MIN, SUM, COUNT)

In SQL —


-- 1. Aggregate

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-- Limitations of GroupBy and normal aggregate functions

select distinct(region) from wage_data_csv
select distinct(education) from wage_data_csv
select distinct(jobclass) from wage_data_csv
select region, year, education, jobclass, avg(wage) from wage_data_csv group by region, education, jobclass order by avg(wage) desc
-- Here if we want to select year then we have to use nested queries and then select as above sql statement will throw an error.

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-- Aggregate AVG function

select *, avg(wage) over() as average_salary from wage_data_csv
select region, education, jobclass, avg(wage) over( partition by region, education, jobclass) as avg_wage from wage_data_csv
select *, avg(wage) over( partition by region, education, jobclass) as avg_wage from wage_data_csv
select region, education, jobclass, age, avg(wage) over( partition by region, education, jobclass order by age) as avg_wage from wage_data_csv

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

select distinct region, education, jobclass, avg(wage) over( partition by region, education, jobclass) as avg_wage from wage_data_csv
select distinct region, education, jobclass, max(wage) over( partition by region, education, jobclass) as max_wage from wage_data_csv
select distinct region, education, jobclass, count(wage) over( partition by region, education, jobclass) as count_wage from wage_data_csv
select distinct region, education, jobclass, min(wage) over( partition by region, education, jobclass) as min_wage from wage_data_csv
select distinct region, education, jobclass, sum(wage) over( partition by region, education, jobclass) as sum_wage from wage_data_csv

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

In Pyspark —

#reading data
df = spark.sql("select * from wage_data_csv")

# Aggregate Window Functions
from pyspark.sql.functions import col,avg,sum,min,max,count
from pyspark.sql import Window

df = df.withColumn("avg_salary", avg(col("wage")).over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("max_salary", max(col("wage")).over(Window.partitionBy("region", "education", "jobclass")))
df = df.withColumn("min_salary", min(col("wage")).over(Window.partitionBy("region", "education", "jobclass")))
df = df.withColumn("sum_salary", sum(col("wage")).over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("count_salary_units", count(col("wage")).over(Window.partitionBy("region", "education", "jobclass")))

df.select("region", "education", "jobclass","wage","avg_salary","max_salary","min_salary","sum_salary","count_salary_units").distinct().filter(df.education=="1. < HS Grad").display()
#df.select("region", "education", "jobclass", "avg_salary").display()


In SQL —

-- 2. Ranking

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-- Functionalities:

select region, education, jobclass, wage, row_number() over(partition by region, education, jobclass order by wage) from wage_data_csv
-- row_number() adds an integer to the records w.r.t order by column based on partitions.

select region, education, jobclass, wage, rank() over(partition by region, education, jobclass order by wage) from wage_data_csv
-- rank() adds an integers w.r.t order by column based on partitions, but if there is tie same number is assigned and for subsequent records, numbers are skipped.

select region, education, jobclass, wage, dense_rank() over(partition by region, education, jobclass order by wage) from wage_data_csv
-- dense_rank() adds an integers w.r.t order by column based on partitions,if there is tie same number is assigned but for subsequent records, numbers are not skipped.

select region, education, jobclass, wage, percent_rank() over(partition by region, education, jobclass order by wage) from wage_data_csv
-- percent_rank() adds an integers w.r.t order by column based on partitions, Here rank is scaled between 0 and 1 as percentages

select region, education, jobclass, wage, ntile(10) over(partition by region, education, jobclass order by wage) from wage_data_csv
--ntile(10) adds an integers w.r.t order by column based on partitions, Here data is divided into chunks.

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

-- usage

-- to select records with wages more than 50%
select * from (select region, education, jobclass, wage, percent_rank() over(partition by region, education, jobclass order by wage) as pr from wage_data_csv) where pr>0.5

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

In Pyspark —

# Ranking Functions

from pyspark.sql.functions import row_number, rank, dense_rank, ntile, percent_rank
from pyspark.sql import Window

df = df.withColumn("row_number", row_number().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("rank", rank().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("dense_rank", dense_rank().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("percent_rank", percent_rank().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("ntile", ntile(10).over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))

df.select("region", "education", "jobclass","wage","row_number","rank","dense_rank","percent_rank","ntile").distinct().filter(df.education=="1. < HS Grad").display()
#df.select("region", "education", "jobclass", "avg_salary").display()


In SQL —

-- 3. Value Functions

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-- Functionalities

select region, education, jobclass, wage, lag(wage) over(partition by region, education, jobclass order by wage) from wage_data_csv where education = "1. < HS Grad"
-- Shifts entire wage column down by a value based on partition, first value is none

select region, education, jobclass, wage, lead(wage) over(partition by region, education, jobclass order by wage) from wage_data_csv where education = "1. < HS Grad"
-- Shifts entire wage column up by a value based on partition, last value is none based on partition

select region, education, jobclass, wage, FIRST_VALUE(wage) over(partition by region, education, jobclass order by wage) from wage_data_csv where education = "1. < HS Grad"
-- is used to retrieve first value from the column passed based on partition

select region, education, jobclass, wage, LAST_VALUE(wage) over(partition by region, education, jobclass) from wage_data_csv where education = "1. < HS Grad"
-- is used to retrieve last value from the column passed based on partition

select region, education, jobclass, wage, nth_value(wage,2) over(partition by region, education, jobclass order by wage) from wage_data_csv where education = "1. < HS Grad"
-- is used to retrieve nth value from the column passed based on partition, above nth value will be none

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

In Pyspark —

# Analytical Window Functions
from pyspark.sql.functions import cume_dist, lag, lead
from pyspark.sql import Window

df = df.withColumn("cume_dist", cume_dist().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("lag", lag("wage").over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("lead", lead("wage").over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))

df.select("region", "education", "jobclass","wage","cume_dist","lag","lead").distinct().filter(df.education=="1. < HS Grad").display()

