
Pyspark Handle Dataset With Columns Separator in Data
Last Updated on January 11, 2021 by Editorial Team
Author(s): Vivek Chaudhary
Programming
The objective of this blog is to handle a special scenario where the column separator or delimiter is present in the dataset. Handling such a type of dataset can be sometimes a headache for Pyspark Developers but anyhow it has to be handled. In my blog, I will share my approach to handling the challenge, I am open to learning so please share your approach asย well.

Dataset basically looks likeย below:
#first line is the header
NAME|AGE|DEP
Vivek|Chaudhary|32|BSC
John|Morgan|30|BE
Ashwin|Rao|30|BE
The dataset contains three columns โNameโ, โAGEโ, โDEPโ separated by delimiter โ|โ. And if we pay focus on the data set it also contains โ|โ for the columnย name.
Letโs see further how to proceed with theย same:
Step1. Read the dataset using read.csv() method ofย spark:
#create spark session
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName(โdelimitโ).getOrCreate()
The above command helps us to connect to the spark environment and lets us read the dataset using spark.read.csv()
#create dataframe
df=spark.read.option(โdelimiterโ,โ|โ).csv(rโ<path>\delimit_data.txtโ,inferSchema=True,header=True)
df.show()

After reading from the file and pulling data into memory this is how it looks like. But wait, where is the last column data, column AGE must have an integer data type but we witnessed something else. This is not what we expected. A mess a complete mismatch isnโt this? The answer is Yes itโs a mess. Reminds me of Bebe Rexha song โIโm a Messโย ๐๐
Now, let's learn how we can fixย this.
Step2. Read the data again but this time use read.text() method:
df=spark.read.text(rโC:\Users\lenovo\Python_Pyspark_Corp_Training\delimit_data.txtโ)
df.show(truncate=0)

#extract first row as this is our header
head=df.first()[0]
schema=[โfnameโ,โlnameโ,โageโ,โdepโ]
print(schema)
Output: ['fname', 'lname', 'age', 'dep']
The next step is to split the dataset on basis of column separator:
#filter the header, separate the columns and apply the schema
df_new=df.filter(df[โvalueโ]!=head).rdd.map(lambda x:x[0].split(โ|โ)).toDF(schema)
df_new.show()

Now, we have successfully separated the strain. Wait what Strain? No Dude itโs not Corona Virus itโs only textual data. Keep it, simple buddy.ย ๐๐
We have successfully separated the pipe โ|โ delimited column (โnameโ) data into two columns. Now the data is more cleaned to be played withย ease.
Next, concat the columns โfnameโ and โlnameโย :
from pyspark.sql.functions import concat, col, lit
df1=df_new.withColumn(โfullnameโ,concat(col(โfnameโ),lit(โ|โ),col(โlnameโ)))
df1.show()

To validate the data transformation we will write the transformed dataset to a CSV file and then read it using read.csv() method.
df1.write.option(โsepโ,โ|โ).mode(โoverwriteโ).option(โheaderโ,โtrueโ).csv(rโ<file_path>\cust_sep.csvโ)
The next step is Data Validation:
df=spark.read.option(โdelimiterโ,โ|โ).csv(r<filepath>,inferSchema=True,header=True)
df.show()

Data looks in shape now and the way we wanted.
A small exercise, try with some different delimiter and let me know if you find any anomaly. Thatโs it with this blog. Will come up with a different scenario nextย time.
Thanks to all for reading my blog. Do share your views or feedback.
Pyspark Handle Dataset With Columns Separator in Data was originally published in Towards AI on Medium, where people are continuing the conversation by highlighting and responding to this story.
Published via Towards AI