Pyspark MLlib | Classification using Pyspark ML
Last Updated on July 26, 2023 by Editorial Team
Author(s): Muttineni Sai Rohith
Originally published on Towards AI.
Pyspark MLlib U+007C Classification using Pyspark ML
In the previous sections, we discussed about RDD, Dataframes, and Pyspark concepts. In this article, we will discuss about Pyspark MLlib and Spark ML. Later on, we will train a classifier for Car Evaluation data, by Encoding the data, Feature extraction and Developing classifier model using various algorithms and evaluate the results.
For a detailed tutorial about Pyspark, Pyspark RDD, and DataFrame concepts, Handling missing values, refer to the link below:
Pyspark For Beginners
PySpark is a Python API for Apache Spark. using PySpark we can run applications parallelly on the distributed clusterβ¦
blog.devgenius.io
Spark MLlib is a short form of spark machine-learning library. Pyspark MLlib is a wrapper over PySpark Core to do data analysis using machine-learning algorithms. It works on distributed systems and is scalable. We can find implementations of classification, clustering, linear regression, and other machine-learning algorithms in PySpark MLlib.
MLlib is Sparkβs scalable machine learning library consisting of common machine learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, and dimensionality reduction, as well as underlying optimization primitives.
Classification using Pyspark MLlib
As a part of this article, we will perform classification on the car evaluation dataset. This dataset consists of 6 attributes describing cars and one Target variable β car_type containing multiple Categories. The dataset used can be found here.
First, letβs create the sparkSession β
from pyspark.sql import SparkSessionspark = SparkSession.builder.appName("Practice").getOrCreate()
spark
Letβs load the data from the CSV file β
df_pyspark = spark.read.csv("car_data.csv",inferSchema=True, header=True)df_pyspark.show(5)
For most machine learning algorithms, numerical data is a must β So letβs see the schema of this DataFrame.
As we can see, we have String Columns. Let's encode them into Integers using Pyspark StringIndexer.
from pyspark.ml.feature import StringIndexercategoricalColumns = ["buying","maintainence","doors","persons","lug_boot","safety","car_type"]l = []for categoricalCol in categoricalColumns: stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol+"_encoded").fit(df_pyspark) df_pyspark = stringIndexer.transform(df_pyspark) df_pyspark = df_pyspark.withColumn(categoricalCol+"_encoded", df_pyspark[categoricalCol+"_encoded"].cast('int'))encoded_df = df_pyspark.select("buying_encoded","doors","maintainence_encoded","persons_encoded","lug_boot_encoded","safety_encoded","car_type_encoded")encoded_df.show(5)
In the above code, we have imported StringIndexer and transformed each String Column into a Numerical column. Initially, StringIndexer returns the data in float format, so in the next step, we have Performed casting and converted float values into Numerical. Once we have created the encoded DataFrame, we select only the encoded values. The output of Encoded Dataframe is as follows β
As the data is ready, Letβs perform feature extraction using VectorAssembler from Pyspark
from pyspark.ml.feature import VectorAssemblerfeatureAssembler = VectorAssembler(inputCols=["buying_encoded","doors_encoded","maintainence_encoded","persons_encoded","lug_boot_encoded","safety_encoded"],outputCol="features")output = featureAssembler.transform(encoded_df)output.select("features","car_type_encoded").show(5)
In Pyspark, unlike methods used in pandas, we convert all the independent columns into one feature using VectorAssembler. The feature created is used for training. Our final DataFrame containing the required information is as below:
Let's split the data for training and testing.
train, test = output.randomSplit([0.8, 0.2], seed=17)
unlike train_test_split from scikit-learn, we perform splitting using random split available in Pyspark DataFrame
Our data is ready, so letβs prepare the model.
Logistic Regression
from pyspark.ml.classification import LogisticRegressionlr = LogisticRegression(featuresCol = 'features', labelCol = 'car_type_encoded', maxIter=10)lrModel = lr.fit(train)
We have created a logistic regression model by importing Logistic Regression from Pyspark.ml, We gave βfeaturesβ β featuresCol as independent Variable and βcar_type_encodedβ β labelCol as a dependent Variable.
Letβs use our model to predict the test data.
predictions = lrModel.transform(test)predictions.show(5)
Here, I trimmed a few columns to show the priority columns. As we can see, we have predicted the car_type.
Letβs evaluate the model. Unlike metrics used traditionally, Pyspark provides the following metrics β
As we have MultiClass DataFrame, let's use MulticlassClassificationEvaluator.
from pyspark.ml.evaluation import MulticlassClassificationEvaluatorevaluator = MulticlassClassificationEvaluator()evaluator.setLabelCol("car_type_encoded")evaluator.setPredictionCol("prediction")evaluator.evaluate(predictions)
As we can see, our model performed poorly using Logistic Regression as it contains multiple classes. So Let's use the Decision Tree to improve the performance.
Decision trees are widely used since they are easy to interpret, handle categorical features, extend to multi-class classification, do not require feature scaling, and are able to capture non-linearities and feature interactions.
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator#Training Modeldt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'car_type_encoded', maxDepth = 3)
dtModel = dt.fit(train)#Predictionpredictions = dtModel.transform(test)#Evaluating the performanceevaluator = MulticlassClassificationEvaluator()
evaluator.setLabelCol("car_type_encoded")
evaluator.setPredictionCol("prediction")print("Test Area Under ROC: ",evaluator.evaluate(predictions))
As we can, even though the performance is improved compared to the Logistic Regression model, still the performance is not that satisfactory.
So letβs use Ensemble methods like Random Forest to improve the performance.
Random Forest
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator#Training Model
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'car_type_encoded', numTrees = 500, maxDepth = 10)
rfModel = rf.fit(train)#Prediction
predictions = rfModel.transform(test)#Evaluating the performance
evaluator = MulticlassClassificationEvaluator()
evaluator.setLabelCol("car_type_encoded")
evaluator.setPredictionCol("prediction")
print("Test Area Under ROC: ",evaluator.evaluate(predictions))
As we have used hyperparameters numTrees and maxDepth, we can see that performance of the model is improved a lot, and we got good results.
Here, we have used minimal methods and achieved the desired performance. Alternatively, we can use many algorithms and techniques that are present in pyspark Ml to build the models.
We have covered all the major concepts using Pyspark in this series of articles.
I look forward to hearing your valuable feedback or questions. Happy to assistβ¦
Happy codingβ¦.
Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming aΒ sponsor.
Published via Towards AI