Exploratory Data Analysis (EDA) using Pyspark
Last Updated on July 24, 2023 by Editorial Team
Author(s): Vivek Chaudhary
Originally published on Towards AI.
Data Analytics, Python
The objective of this article is to perform analysis on the dataset and answer some questions to get the insight of data. We will learn how to connect to Oracle DB and create a Pyspark DataFrame and perform different operations to understand the various aspect of the dataset.
Exploratory Data Analysis or (EDA) is an understanding of the data sets by summarizing their main characteristics.
As this is my first Blog on EDA, so I have tried to keep the content simple just to make sure I resonate with my readers. So without wasting further a minute lets get started with the analysis.
1. Pyspark connection and Application creation
import pyspark
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName(βData_Analysisβ).getOrCreate()
2. Pyspark DB connection and Import Datasets
#Import Sales Data
sales_df = spark.read.format(βjdbcβ).option(βurlβ, βjdbc:oracle:thin:sh/sh@//localhost:1521/orclβ).option(βdbtableβ, βsalesβ).option(βuserβ, βshβ).option(βpasswordβ, βshβ).option(βdriverβ, βoracle.jdbc.driver.OracleDriverβ).load()#Import Customer Datacust_df = spark.read.format(βjdbcβ).option(βurlβ, βjdbc:oracle:thin:sh/sh@//localhost:1521/orclβ).option(βdbtableβ, βcustomersβ).option(βuserβ, βshβ).option(βpasswordβ, βshβ).option(βdriverβ, βoracle.jdbc.driver.OracleDriverβ).load()#Import Products dataprod_df = spark.read.format(βjdbcβ).option(βurlβ, βjdbc:oracle:thin:sh/sh@//localhost:1521/orclβ).option(βdbtableβ, βproductsβ).option(βuserβ, βshβ).option(βpasswordβ, βshβ).option(βdriverβ, βoracle.jdbc.driver.OracleDriverβ).load()#Import Channels Datachan_df = spark.read.format(βjdbcβ).option(βurlβ, βjdbc:oracle:thin:sh/sh@//localhost:1521/orclβ).option(βdbtableβ, βchannelsβ).option(βuserβ, βshβ).option(βpasswordβ, βshβ).option(βdriverβ, βoracle.jdbc.driver.OracleDriverβ).load()#Import Country datacountry_df = spark.read.format(βjdbcβ).option(βurlβ, βjdbc:oracle:thin:sh/sh@//localhost:1521/orclβ).option(βdbtableβ, βcountriesβ).option(βuserβ, βshβ).option(βpasswordβ, βshβ).option(βdriverβ, βoracle.jdbc.driver.OracleDriverβ).load()
ojdbc6 is required for Oracle DB connection
3. Display the data
#sales data
sales_df.show(10)
print('Total Records in dataset',sales_df.count())
#customer data
cust_df.show(5)
print(βTotal Records in datasetβ,cust_df.count())
#product data
prod_df.show(5)
print(βTotal Records in datasetβ,prod_df.count())
#channels data
chan_df.show()
print(βTotal Records in datasetβ,chan_df.count())
#Country data
country_df.show(5)
print(βTotal Records in datasetβ,country_df.count())
4. Display schema and columns of DataFrame
#dataframe schema
sales_df.printSchema()
#display list of columns
sales_df.columns
5. Select and filter condition on DataFrame
#select some columns from product dataframe
prod_df.select(βPROD_IDβ,
βPROD_NAMEβ,
βPROD_DESCβ,βPROD_STATUSβ,
βPROD_LIST_PRICEβ,
βPROD_MIN_PRICEβ,βPROD_EFF_FROMβ,
βPROD_EFF_TOβ,
βPROD_VALIDβ).show(7)
#filter condition with selective columns
country_df.select(βCOUNTRY_IDβ,
βCOUNTRY_ISO_CODEβ,
βCOUNTRY_NAMEβ,).filter(country_df.COUNTRY_NAME==βIndiaβ).show()
Typecast Column_ID to convert Decimal data to Integer data.
from pyspark.sql.types import IntegerType
country_df.select(country_df[βCOUNTRY_IDβ].cast(IntegerType()).alias(βCOUNTRY_IDβ),
βCOUNTRY_ISO_CODEβ,
βCOUNTRY_NAMEβ,).filter(country_df.COUNTRY_NAME==βIndiaβ).show()
6. GroupBy and Aggregation
Let's find out how a customer spend in a year and over the span of 4 years from 1998β2002 find out customer spending in an individual year.
from pyspark.sql.functions import dayofyear,year
from pyspark.sql.functions import round, colsale_sum_df=sales_df.select(βCUST_IDβ,βTIME_IDβ,βAMOUNT_SOLDβ)cust_wise_df=sale_sum_df.groupBy(round(βCUST_IDβ,0).alias(βCUST_IDβ), year(sale_sum_df[βTIME_IDβ]).alias(βYEARβ)).sum(βAMOUNT_SOLDβ)cust_wise_df.show(10)
7. Data Sorting
#Sort the records on basis of
cust_wise_df.orderBy(cust_wise_df.CUST_ID).show(15)
Lets check Year wise Customer spending.
cust_wise_df.filter(cust_wise_df.CUST_ID==3261).show()
8. Number of Customer visits over the time
sale_sum_df.groupBy(sale_sum_df[βCUST_IDβ].cast(IntegerType()).alias(βCUST_IDβ)).count().show(10)
9. Total Sale of a Product over the span of 4 years
s_df=sales_df.select(round(βPROD_IDβ,0).alias(βPROD_IDβ),year(sale_sum_df[βTIME_IDβ]).alias(βYEARβ),βAMOUNT_SOLDβ)
#withColumnRenamed changes column name
s_df=s_df.withColumnRenamed(βPROD_IDβ,βPROD_ID_Sβ)p_df=prod_df.select(βPROD_IDβ,βPROD_NAMEβ)
p_df=p_df.withColumnRenamed(βPROD_IDβ,βPROD_ID_Pβ)#join the above two dataframes created
prod_sales_df=s_df.join(p_df,p_df.PROD_ID_P==s_df.PROD_ID_S,how='inner')#perform groupby and aggregation to sum the sales amount productwise
prod_sales=prod_sales_df.groupBy('PROD_ID_S','PROD_NAME').sum('AMOUNT_SOLD')
prod_sales=prod_sales.select(col('PROD_ID_S').alias('PROD_ID'),'PROD_NAME',col('sum(AMOUNT_SOLD)').alias('TOTAL_SALES'))
prod_sales.show(10)
10. Channel wise Total Sales
#find out which channel contributed most to the salesc_df=chan_df.select(col(βCHANNEL_IDβ).alias(βCHANNEL_ID_Cβ),col(βCHANNEL_DESCβ).alias(βCHANNEL_NAMEβ))sa_df=sales_df.select(col(βCHANNEL_IDβ).alias(βCHANNEL_ID_Sβ),βAMOUNT_SOLDβ)chan_sales_df=sa_df.join(c_df,c_df.CHANNEL_ID_C==sa_df.CHANNEL_ID_S,how=βinnerβ)
chan_sale=chan_sales_df.groupBy(round(βCHANNEL_ID_Cβ,0).alias(βCHANNEL_IDβ)).sum(βAMOUNT_SOLDβ)chan_top_sales=chan_sale.withColumnRenamed(βsum(AMOUNT_SOLD)β,βTOT_AMOUNTβ)chan_top_sales.orderBy(βCHANNEL_IDβ).show()
Summary
Β· Pyspark DB connectivity
Β· Data display using show()
Β· Schema and columns of Dataframe
Β· Apply select and filter condition on DFs
Β· GroupBy and Aggregation
Β· Column renames
Β· Some Data Insights
Hurray, here we completed Exploratory Data Analysis using Pyspark and tried to make data look sensible. In upcoming articles on Data Analysis, I will share some more Pyspark functionalities.
Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming aΒ sponsor.
Published via Towards AI