Pyspark Handle Dataset With Columns Separator in Data
Last Updated on January 11, 2021 by Editorial Team
Author(s): Vivek Chaudhary
Programming
The objective of this blog is to handle a special scenario where the column separator or delimiter is present in the dataset. Handling such a type of dataset can be sometimes a headache for Pyspark Developers but anyhow it has to be handled. In my blog, I will share my approach to handling the challenge, I am open to learning so please share your approach asΒ well.
Dataset basically looks likeΒ below:
#first line is the header
NAME|AGE|DEP
Vivek|Chaudhary|32|BSC
John|Morgan|30|BE
Ashwin|Rao|30|BE
The dataset contains three columns βNameβ, βAGEβ, βDEPβ separated by delimiter β|β. And if we pay focus on the data set it also contains β|β for the columnΒ name.
Letβs see further how to proceed with theΒ same:
Step1. Read the dataset using read.csv() method ofΒ spark:
#create spark session
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName(βdelimitβ).getOrCreate()
The above command helps us to connect to the spark environment and lets us read the dataset using spark.read.csv()
#create dataframe
df=spark.read.option(βdelimiterβ,β|β).csv(rβ<path>\delimit_data.txtβ,inferSchema=True,header=True)
df.show()
After reading from the file and pulling data into memory this is how it looks like. But wait, where is the last column data, column AGE must have an integer data type but we witnessed something else. This is not what we expected. A mess a complete mismatch isnβt this? The answer is Yes itβs a mess. Reminds me of Bebe Rexha song βIβm a MessβΒ ππ
Now, let's learn how we can fixΒ this.
Step2. Read the data again but this time use read.text() method:
df=spark.read.text(rβC:\Users\lenovo\Python_Pyspark_Corp_Training\delimit_data.txtβ)
df.show(truncate=0)
#extract first row as this is our header
head=df.first()[0]
schema=[βfnameβ,βlnameβ,βageβ,βdepβ]
print(schema)
Output: ['fname', 'lname', 'age', 'dep']
The next step is to split the dataset on basis of column separator:
#filter the header, separate the columns and apply the schema
df_new=df.filter(df[βvalueβ]!=head).rdd.map(lambda x:x[0].split(β|β)).toDF(schema)
df_new.show()
Now, we have successfully separated the strain. Wait what Strain? No Dude itβs not Corona Virus itβs only textual data. Keep it, simple buddy.Β ππ
We have successfully separated the pipe β|β delimited column (βnameβ) data into two columns. Now the data is more cleaned to be played withΒ ease.
Next, concat the columns βfnameβ and βlnameβΒ :
from pyspark.sql.functions import concat, col, lit
df1=df_new.withColumn(βfullnameβ,concat(col(βfnameβ),lit(β|β),col(βlnameβ)))
df1.show()
To validate the data transformation we will write the transformed dataset to a CSV file and then read it using read.csv() method.
df1.write.option(βsepβ,β|β).mode(βoverwriteβ).option(βheaderβ,βtrueβ).csv(rβ<file_path>\cust_sep.csvβ)
The next step is Data Validation:
df=spark.read.option(βdelimiterβ,β|β).csv(r<filepath>,inferSchema=True,header=True)
df.show()
Data looks in shape now and the way we wanted.
A small exercise, try with some different delimiter and let me know if you find any anomaly. Thatβs it with this blog. Will come up with a different scenario nextΒ time.
Thanks to all for reading my blog. Do share your views or feedback.
Pyspark Handle Dataset With Columns Separator in Data was originally published in Towards AI on Medium, where people are continuing the conversation by highlighting and responding to this story.
Published via Towards AI