Join thousands of AI enthusiasts and experts at the Learn AI Community.

Publication

Latest

Window Functions in SQL and PySpark ( Notebook)

Last Updated on December 12, 2022 by Editorial Team

Author(s): Muttineni Sai Rohith

Originally published on Towards AI the World’s Leading AI and Technology News and Media Company. If you are building an AI-related product or service, we invite you to consider becoming an AI sponsor. At Towards AI, we help scale AI and technology startups. Let us help you unleash your technology to the masses.

Window Functions are something that you use almost every day at work if you are a data engineer. Window functions make life very easy at work. They help in solving some complex problems and help in performing complex operations easily.

let’s just dive into the Window Functions usage and operations that we can perform using them.

Note: Everything Below, I have implemented in Databricks Community Edition. This is not a written article; just pasting the notebook here. To Keep it as a reference for me going forward.

Databricks Notebook —

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. DBFS is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

# File location and type
file_location = "/FileStore/tables/wage_data.csv"
file_type = "csv"
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.load(file_location)
display(df)

Create a view or table from the Pyspark Dataframe

temp_table_name = "wage_data_csv"
df.createOrReplaceTempView(temp_table_name)

Retrieving Data from the table

select * from `wage_data_csv`

With this registered as a temp view, it will only be available to this particular notebook. If you’d like other users to be able to query this table, you can also create a table from the DataFrame.

Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.

permanent_table_name = "wage_data_csv"
df.write.format("parquet").saveAsTable(permanent_table_name)

There are three types of window functions:

  1. Aggregate — (AVG, MAX, MIN, SUM, COUNT)

In SQL —

%sql

-- 1. Aggregate

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-- Limitations of GroupBy and normal aggregate functions

select distinct(region) from wage_data_csv
select distinct(education) from wage_data_csv
select distinct(jobclass) from wage_data_csv
select region, year, education, jobclass, avg(wage) from wage_data_csv group by region, education, jobclass order by avg(wage) desc
-- Here if we want to select year then we have to use nested queries and then select as above sql statement will throw an error.

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-- Aggregate AVG function

select *, avg(wage) over() as average_salary from wage_data_csv
select region, education, jobclass, avg(wage) over( partition by region, education, jobclass) as avg_wage from wage_data_csv
select *, avg(wage) over( partition by region, education, jobclass) as avg_wage from wage_data_csv
select region, education, jobclass, age, avg(wage) over( partition by region, education, jobclass order by age) as avg_wage from wage_data_csv

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
--Functionaltites:

select distinct region, education, jobclass, avg(wage) over( partition by region, education, jobclass) as avg_wage from wage_data_csv
select distinct region, education, jobclass, max(wage) over( partition by region, education, jobclass) as max_wage from wage_data_csv
select distinct region, education, jobclass, count(wage) over( partition by region, education, jobclass) as count_wage from wage_data_csv
select distinct region, education, jobclass, min(wage) over( partition by region, education, jobclass) as min_wage from wage_data_csv
select distinct region, education, jobclass, sum(wage) over( partition by region, education, jobclass) as sum_wage from wage_data_csv

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

In Pyspark —

#Pyspark
#reading data
df = spark.sql("select * from wage_data_csv")
df.display()

# Aggregate Window Functions
from pyspark.sql.functions import col,avg,sum,min,max,count
from pyspark.sql import Window

df = df.withColumn("avg_salary", avg(col("wage")).over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("max_salary", max(col("wage")).over(Window.partitionBy("region", "education", "jobclass")))
df = df.withColumn("min_salary", min(col("wage")).over(Window.partitionBy("region", "education", "jobclass")))
df = df.withColumn("sum_salary", sum(col("wage")).over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("count_salary_units", count(col("wage")).over(Window.partitionBy("region", "education", "jobclass")))

#df.display()
#df.distinct().display()
df.select("region", "education", "jobclass","wage","avg_salary","max_salary","min_salary","sum_salary","count_salary_units").distinct().filter(df.education=="1. < HS Grad").display()
#df.select("region", "education", "jobclass", "avg_salary").display()

2. Ranking — (ROW_NUMBER, RANK, DENSE_RANK, PERCENT_RANK, NTILE)

In SQL —


-- 2. Ranking

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-- Functionalities:

select region, education, jobclass, wage, row_number() over(partition by region, education, jobclass order by wage) from wage_data_csv
-- row_number() adds an integer to the records w.r.t order by column based on partitions.

select region, education, jobclass, wage, rank() over(partition by region, education, jobclass order by wage) from wage_data_csv
-- rank() adds an integers w.r.t order by column based on partitions, but if there is tie same number is assigned and for subsequent records, numbers are skipped.

select region, education, jobclass, wage, dense_rank() over(partition by region, education, jobclass order by wage) from wage_data_csv
-- dense_rank() adds an integers w.r.t order by column based on partitions,if there is tie same number is assigned but for subsequent records, numbers are not skipped.

select region, education, jobclass, wage, percent_rank() over(partition by region, education, jobclass order by wage) from wage_data_csv
-- percent_rank() adds an integers w.r.t order by column based on partitions, Here rank is scaled between 0 and 1 as percentages

select region, education, jobclass, wage, ntile(10) over(partition by region, education, jobclass order by wage) from wage_data_csv
--ntile(10) adds an integers w.r.t order by column based on partitions, Here data is divided into chunks.

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

-- usage

-- to select records with wages more than 50%
select * from (select region, education, jobclass, wage, percent_rank() over(partition by region, education, jobclass order by wage) as pr from wage_data_csv) where pr>0.5

-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

In Pyspark —

# Ranking Functions

from pyspark.sql.functions import row_number, rank, dense_rank, ntile, percent_rank
from pyspark.sql import Window

df = df.withColumn("row_number", row_number().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("rank", rank().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("dense_rank", dense_rank().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("percent_rank", percent_rank().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("ntile", ntile(10).over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))

#df.display()
#df.distinct().display()
df.select("region", "education", "jobclass","wage","row_number","rank","dense_rank","percent_rank","ntile").distinct().filter(df.education=="1. < HS Grad").display()
#df.select("region", "education", "jobclass", "avg_salary").display()

3. Value — (LEAD, LAG, FIRST_VALUE, LAST_VALUE, NTH_VALUE)

In SQL —

-- 3. Value Functions

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-- Functionalities

select region, education, jobclass, wage, lag(wage) over(partition by region, education, jobclass order by wage) from wage_data_csv where education = "1. < HS Grad"
-- Shifts entire wage column down by a value based on partition, first value is none

select region, education, jobclass, wage, lead(wage) over(partition by region, education, jobclass order by wage) from wage_data_csv where education = "1. < HS Grad"
-- Shifts entire wage column up by a value based on partition, last value is none based on partition

select region, education, jobclass, wage, FIRST_VALUE(wage) over(partition by region, education, jobclass order by wage) from wage_data_csv where education = "1. < HS Grad"
-- is used to retrieve first value from the column passed based on partition

select region, education, jobclass, wage, LAST_VALUE(wage) over(partition by region, education, jobclass) from wage_data_csv where education = "1. < HS Grad"
-- is used to retrieve last value from the column passed based on partition

select region, education, jobclass, wage, nth_value(wage,2) over(partition by region, education, jobclass order by wage) from wage_data_csv where education = "1. < HS Grad"
-- is used to retrieve nth value from the column passed based on partition, above nth value will be none

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

In Pyspark —

# Analytical Window Functions
from pyspark.sql.functions import cume_dist, lag, lead
from pyspark.sql import Window

df = df.withColumn("cume_dist", cume_dist().over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("lag", lag("wage").over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))
df = df.withColumn("lead", lead("wage").over(Window.partitionBy("region", "education", "jobclass").orderBy("wage")))

#df.display()
#df.distinct().display()
df.select("region", "education", "jobclass","wage","cume_dist","lag","lead").distinct().filter(df.education=="1. < HS Grad").display()

I am writing this just as a reference to me…..

I feel my brain is a library handbook that holds references to all the concepts and on a particular day, if it wants to retrieve more about a concept in detail, it can select the book from the handbook reference and retrieve the data by seeing it.

So That’s why I write…

Happy Learning..


Window Functions in SQL and PySpark ( Notebook) was originally published in Towards AI on Medium, where people are continuing the conversation by highlighting and responding to this story.

Join thousands of data leaders on the AI newsletter. It’s free, we don’t spam, and we never share your email address. Keep up to date with the latest work in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.

Published via Towards AI

Feedback ↓