Master LLMs with our FREE course in collaboration with Activeloop & Intel Disruptor Initiative. Join now!

Publication

How to Train XGBoost Model With PySpark
Latest   Machine Learning

How to Train XGBoost Model With PySpark

Last Updated on July 25, 2023 by Editorial Team

Author(s): Divy Shah

Originally published on Towards AI.

Why XGBoost?

XGBoost (eXtreme Gradient Boosting) is one of the most popular and widely used ML algorithms by Data Scientists in every industry. Also, this algorithm is very efficient in terms of reducing computing time and providing optimal usage of memory resources, another important feature is handling missing values on implementation and parallelization of the training process.

Nowadays, due to the rapidly increasing dataset size, distributed training is really important, so in this blog, we are going to explore how someone can integrate the XGBoost + PySpark and do the model training and scoring.

One can easily use the available ml algorithm inside pyspark.ml or MLLib, but to use the XGBoost in the same way, we have to add a few external dependencies and python XGBoost wrapper, the another way is to directly use XGBoost native framework with PySpark which is not supported by the latest version of XGBoost (the only constraint here is it supported only on python version ≥3.8), to know how to use that with PySpark check out this document. (thanks to dmlc community..)

but in this blog, our main focus is how to integrate with PySpark on python version < 3.8.

Before starting the integration part, kindly download the prerequisite files from this link.

Once you have downloaded all 3 files, we are all set to integrate the XGBoost with PySpark follow the below steps,

  1. As mentioned in below example code add the JAR files in the PYSPARK_SUBMIT_ARGS section (make sure to add the exact path of the file location)
  2. Create your Spark session.
  3. Add the XGBoost python wrapper code file (.zip file) in sparkContext. (we are doing this in order to support XGBoost import, again make sure to add the correct path of the zip file)
import os
import findspark
import numpy as np

## Add the path of the downloaded jar files
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark.jar,xgboost4j.jar pyspark-shell'

findspark.init()

spark = SparkSession\
.builder\
.appName("XGBoost_PySpark")\
.master("local[*]")\
.getOrCreate()

spark.sparkContext.addPyFile("sparkxgb.zip")

Once you are done with above steps you can cross check by importing XGBClassifier or Regressor.

from sparkxgb.xgboost import XGBoostClassificationModel, XGBoostClassifier

if in case you get a JAR error, then maybe it is due to a version mismatch, in that case, you can check by downloading another version of JAR files from here.

Now once all is done let’s try to build one simple model, follow the below steps.

Here we have taken an open source loan prediction dataset and tried to predict whether the loan is going to approve or not, here Loan_Status is our target variable (as it is in the Y/N format we have converted the same in 1/0).

  1. Add the required Spark Libraries
from pyspark.ml.feature import StringIndexer, VectorAssembler
from sparkxgb.xgboost import XGBoostClassificationModel, XGBoostClassifier
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import functions as F

2. Load the dataset and do the required pre-process

data = spark.read.parquet('train.parquet')
data = data.withColumn('label', F.when(F.col('Loan_Status')=='Y', 1) \
.otherwise(0)
)
data.show(5)
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----+
U+007C Loan_IDU+007CGenderU+007CMarriedU+007CDependentsU+007C EducationU+007CSelf_EmployedU+007CApplicantIncomeU+007CCoapplicantIncomeU+007CLoanAmountU+007CLoan_Amount_TermU+007CCredit_HistoryU+007CProperty_AreaU+007CLoan_StatusU+007ClabelU+007C
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----+
U+007CLP001002U+007C MaleU+007C NoU+007C 0U+007C GraduateU+007C NoU+007C 5849U+007C 0.0U+007C nullU+007C 360.0U+007C 1.0U+007C UrbanU+007C YU+007C 1U+007C
U+007CLP001003U+007C MaleU+007C YesU+007C 1U+007C GraduateU+007C NoU+007C 4583U+007C 1508.0U+007C 128.0U+007C 360.0U+007C 1.0U+007C RuralU+007C NU+007C 0U+007C
U+007CLP001005U+007C MaleU+007C YesU+007C 0U+007C GraduateU+007C YesU+007C 3000U+007C 0.0U+007C 66.0U+007C 360.0U+007C 1.0U+007C UrbanU+007C YU+007C 1U+007C
U+007CLP001006U+007C MaleU+007C YesU+007C 0U+007CNot GraduateU+007C NoU+007C 2583U+007C 2358.0U+007C 120.0U+007C 360.0U+007C 1.0U+007C UrbanU+007C YU+007C 1U+007C
U+007CLP001008U+007C MaleU+007C NoU+007C 0U+007C GraduateU+007C NoU+007C 6000U+007C 0.0U+007C 141.0U+007C 360.0U+007C 1.0U+007C UrbanU+007C YU+007C 1U+007C
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----+

As in the above sample data, we can see a few columns having categorical/string values.. so we need to convert the same into numeric values before passing them into the model, PySpark provides the StringIndexer & OneHotEncoder for the same here we are gonna StringIndexer.

index1 = StringIndexer().setInputCol("Gender").setOutputCol("GenderIndex").setHandleInvalid("keep")
index2 = StringIndexer().setInputCol("Married").setOutputCol("MarriedIndex").setHandleInvalid("keep")
index3 = StringIndexer().setInputCol("Education").setOutputCol("EducationIndex").setHandleInvalid("keep")
index4 = StringIndexer().setInputCol("Self_Employed").setOutputCol("SelfEmployedIndex").setHandleInvalid("keep")
index5 = StringIndexer().setInputCol("Property_Area").setOutputCol("PropertyAreaIndex").setHandleInvalid("keep")

Now, as we can see, we have multiple StringIndexer objects, and we need to do the transform via each indexer, which is a lengthy process so to club all the transformations, feature vectorization, and model fitting PySpark ML provides the concept of ML Pipeline in which we can combine several steps together and run in on the go,

Before creating Pipeline Object, let’s define the VectorAssembler and Model object.

## define list of your final features
features = ['GenderIndex', 'MarriedIndex', 'EducationIndex', 'SelfEmployedIndex', 'PropertyAreaIndex',
'ApplicantIncome', 'CoapplicantIncome', 'LoanAmount', 'Loan_Amount_Term', 'Credit_History']

vec_assembler = VectorAssembler(inputCols=features, outputCol='features', handleInvalid='keep')

xgb = XGBoostClassifier(objective="binary:logistic",seed=1712,
featuresCol="features",
labelCol="label",
missing=0.0,
)

Now let’s create Pipeline Object and build the final model.

3. Create ML Pipeline

# here add all your steps inside setStages

pipeline = Pipeline().setStages([index1, index2, index3, index4, index5, vec_assembler, xgb])

# split the data in train and test
trainDF, testDF = train_data.randomSplit([0.7, 0.3], seed=1712)

model = pipeline.fit(trainDF)

# Generate the prediction on test data

predictions = model.transform(testDF)[['Loan_ID', 'prediction', 'label']]
predictions.show()
+--------+----------+-----+
U+007C Loan_IDU+007CpredictionU+007ClabelU+007C
+--------+----------+-----+
U+007CLP001005U+007C 1.0U+007C 1U+007C
U+007CLP001013U+007C 1.0U+007C 1U+007C
U+007CLP001018U+007C 1.0U+007C 1U+007C
U+007CLP001027U+007C 1.0U+007C 1U+007C
U+007CLP001032U+007C 1.0U+007C 1U+007C
+--------+----------+-----+

wow.. finally we build XGBoost model and tested using PySpark

4. Check the performance metrics

from pyspark.sql.types import DoubleType

predictionAndLabels = predictions.select(['prediction', 'label']\
).withColumn('label',F.col('label').cast(DoubleType())).rdd

metrics = MulticlassMetrics(predictionAndLabels)

cm = metrics.confusionMatrix().toArray()

accuracy=(cm[0][0]+cm[1][1])/cm.sum()
precision=(cm[1][1])/(cm[0][1]+cm[1][1])
recall=(cm[1][1])/(cm[1][0]+cm[1][1])

print(accuracy, precision, recall)
(0.7015706806282722, 0.7517241379310344, 0.8384615384615385)

5. Hyper-parameter tuning (optional step)

If you want to tune your XGBoost Parameters, then follow the below code, actually, it is not the feasible way as on a large amount of data, it is very expensive as it builds a model on each and every combination of params, a better approach is to use the RandomSearchCV in python and use that param here or you can design code in PySpark using random value choose and assign to model every time.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

xgbEval = BinaryClassificationEvaluator()

# Define your list of grid search parameters

paramGrid = (ParamGridBuilder()
.addGrid(xgb.alpha,[1e-5, 1e-2, 0.1])
.addGrid(xgb.eta, [0.001, 0.01])
.addGrid(xgb.numRound, [150,160])
.addGrid(xgb.maxDepth, range(3,7,3))
.addGrid(xgb.minChildWeight, [3.0, 4.0])
.addGrid(xgb.gamma, [i/10.0 for i in range(0,2)])
.addGrid(xgb.colsampleBytree, [i/10.0 for i in range(3,6)])
.addGrid(xgb.subsample, [0.4,0.6])
.build())

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=xgbEval, numFolds=3)
cvModel = cv.fit(trainDF)
cvPreds = cvModel.transform(testDF)
xgbEval.evaluate(cvPreds)

## Print the tuned params
cvModel.bestModel.extractParamMap()

Thanks for taking out the time to read my blog, I hope it was helpful to you. To summarise, we learned a bit about XGBoost and its advantages. We also looked at step-by-step implementation of XGBoost in PySpark using external dependencies. I look forward to all kinds of suggestions from my readers, let me know what’s next you want me to write about.

Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.

Published via Towards AI

Feedback ↓