Window Functions in SQL and PySpark ( Notebook)
Last Updated on December 12, 2022 by Editorial Team
Author(s): Muttineni Sai Rohith
Originally published on Towards AI the World’s Leading AI and Technology News and Media Company. If you are building an AI-related product or service, we invite you to consider becoming an AI sponsor. At Towards AI, we help scale AI and technology startups. Let us help you unleash your technology to the masses.
Window Functions are something that you use almost every day at work if you are a data engineer. Window functions make life very easy at work. They help in solving some complex problems and help in performing complex operations easily.
letβs just dive into the Window Functions usage and operations that we can perform usingΒ them.
Note: Everything Below, I have implemented in Databricks Community Edition. This is not a written article; just pasting the notebook here. To Keep it as a reference for me goingΒ forward.
Databricks NotebookΒ β
This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. DBFS is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to readΒ from.
This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.
# File location and type
file_location = "/FileStore/tables/wage_data.csv"
file_type = "csv"
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.load(file_location)
display(df)
Create a view or table from the Pyspark Dataframe
temp_table_name = "wage_data_csv"
df.createOrReplaceTempView(temp_table_name)
Retrieving Data from theΒ table
select * from `wage_data_csv`
With this registered as a temp view, it will only be available to this particular notebook. If youβd like other users to be able to query this table, you can also create a table from the DataFrame.
Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query thisΒ data.
permanent_table_name = "wage_data_csv"
df.write.format("parquet").saveAsTable(permanent_table_name)
There are three types of window functions:
- Aggregateβββ(AVG, MAX, MIN, SUM,Β COUNT)
In SQLΒ β
%sql
-- 1. Aggregate
-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-- Limitations of GroupBy and normal aggregate functions
select distinct(region) from wage_data_csv
select distinct(education) from wage_data_csv
select distinct(jobclass) from wage_data_csv
select region, year, education, jobclass, avg(wage) from wage_data_csv group by region, education, jobclass order by avg(wage) desc
-- Here if we want to select year then we have to use nested queries and then select as above sql statement will throw an error.
-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-- Aggregate AVG function
select *, avg(wage) over() as average_salary from wage_data_csv
select region, education, jobclass, avg(wage) over( partition by region, education, jobclass) as avg_wage from wage_data_csv
select *, avg(wage) over( partition by region, education, jobclass) as avg_wage from wage_data_csv
select region, education, jobclass, age, avg(wage) over( partition by region, education, jobclass order by age) as avg_wage from wage_data_csv
-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
--Functionaltites:
select distinct region, education, jobclass, avg(wage) over( partition by region, education, jobclass) as avg_wage from wage_data_csv
select distinct region, education, jobclass, max(wage) over( partition by region, education, jobclass) as max_wage from wage_data_csv
select distinct region, education, jobclass, count(wage) over( partition by region, education, jobclass) as count_wage from wage_data_csv
select distinct region, education, jobclass, min(wage) over( partition by region, education, jobclass) as min_wage from wage_data_csv
select distinct region, education, jobclass, sum(wage) over( partition by region, education, jobclass) as sum_wage from wage_data_csv
-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
In PysparkΒ β
#Pyspark
#reading data
df = spark.sql("select * from wage_data_csv")
df.display()
# Aggregate Window Functions
from pyspark.sql.functions import col,avg,sum,min,max,count
from pyspark.sql import Window
df = df.withColumn("avg_salary", avg(col("wage")).over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("max_salary", max(col("wage")).over(Window.partitionBy("region", "education", "jobclass")))
df = df.withColumn("min_salary", min(col("wage")).over(Window.partitionBy("region", "education", "jobclass")))
df = df.withColumn("sum_salary", sum(col("wage")).over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("count_salary_units", count(col("wage")).over(Window.partitionBy("region", "education", "jobclass")))
#df.display()
#df.distinct().display()
df.select("region", "education", "jobclass","wage","avg_salary","max_salary","min_salary","sum_salary","count_salary_units").distinct().filter(df.education=="1. < HS Grad").display()
#df.select("region", "education", "jobclass", "avg_salary").display()
2. Rankingβββ(ROW_NUMBER, RANK, DENSE_RANK, PERCENT_RANK, NTILE)
In SQLΒ β
-- 2. Ranking
-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-- Functionalities:
select region, education, jobclass, wage, row_number() over(partition by region, education, jobclass order by wage) from wage_data_csv
-- row_number() adds an integer to the records w.r.t order by column based on partitions.
select region, education, jobclass, wage, rank() over(partition by region, education, jobclass order by wage) from wage_data_csv
-- rank() adds an integers w.r.t order by column based on partitions, but if there is tie same number is assigned and for subsequent records, numbers are skipped.
select region, education, jobclass, wage, dense_rank() over(partition by region, education, jobclass order by wage) from wage_data_csv
-- dense_rank() adds an integers w.r.t order by column based on partitions,if there is tie same number is assigned but for subsequent records, numbers are not skipped.
select region, education, jobclass, wage, percent_rank() over(partition by region, education, jobclass order by wage) from wage_data_csv
-- percent_rank() adds an integers w.r.t order by column based on partitions, Here rank is scaled between 0 and 1 as percentages
select region, education, jobclass, wage, ntile(10) over(partition by region, education, jobclass order by wage) from wage_data_csv
--ntile(10) adds an integers w.r.t order by column based on partitions, Here data is divided into chunks.
-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-- usage
-- to select records with wages more than 50%
select * from (select region, education, jobclass, wage, percent_rank() over(partition by region, education, jobclass order by wage) as pr from wage_data_csv) where pr>0.5
-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
In PysparkΒ β
# Ranking Functions
from pyspark.sql.functions import row_number, rank, dense_rank, ntile, percent_rank
from pyspark.sql import Window
df = df.withColumn("row_number", row_number().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("rank", rank().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("dense_rank", dense_rank().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("percent_rank", percent_rank().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("ntile", ntile(10).over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
#df.display()
#df.distinct().display()
df.select("region", "education", "jobclass","wage","row_number","rank","dense_rank","percent_rank","ntile").distinct().filter(df.education=="1. < HS Grad").display()
#df.select("region", "education", "jobclass", "avg_salary").display()
3. Valueβββ(LEAD, LAG, FIRST_VALUE, LAST_VALUE, NTH_VALUE)
In SQLΒ β
-- 3. Value Functions
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-- Functionalities
select region, education, jobclass, wage, lag(wage) over(partition by region, education, jobclass order by wage) from wage_data_csv where education = "1. < HS Grad"
-- Shifts entire wage column down by a value based on partition, first value is none
select region, education, jobclass, wage, lead(wage) over(partition by region, education, jobclass order by wage) from wage_data_csv where education = "1. < HS Grad"
-- Shifts entire wage column up by a value based on partition, last value is none based on partition
select region, education, jobclass, wage, FIRST_VALUE(wage) over(partition by region, education, jobclass order by wage) from wage_data_csv where education = "1. < HS Grad"
-- is used to retrieve first value from the column passed based on partition
select region, education, jobclass, wage, LAST_VALUE(wage) over(partition by region, education, jobclass) from wage_data_csv where education = "1. < HS Grad"
-- is used to retrieve last value from the column passed based on partition
select region, education, jobclass, wage, nth_value(wage,2) over(partition by region, education, jobclass order by wage) from wage_data_csv where education = "1. < HS Grad"
-- is used to retrieve nth value from the column passed based on partition, above nth value will be none
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
In PysparkΒ β
# Analytical Window Functions
from pyspark.sql.functions import cume_dist, lag, lead
from pyspark.sql import Window
df = df.withColumn("cume_dist", cume_dist().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("lag", lag("wage").over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("lead", lead("wage").over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
#df.display()
#df.distinct().display()
df.select("region", "education", "jobclass","wage","cume_dist","lag","lead").distinct().filter(df.education=="1. < HS Grad").display()
I am writing this just as a reference toΒ meβ¦..
I feel my brain is a library handbook that holds references to all the concepts and on a particular day, if it wants to retrieve more about a concept in detail, it can select the book from the handbook reference and retrieve the data by seeingΒ it.
So Thatβs why IΒ writeβ¦
Happy Learning..
Window Functions in SQL and PySpark ( Notebook) was originally published in Towards AI on Medium, where people are continuing the conversation by highlighting and responding to this story.
Join thousands of data leaders on the AI newsletter. Itβs free, we donβt spam, and we never share your email address. Keep up to date with the latest work in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming aΒ sponsor.
Published via Towards AI