Pyspark Kafka Structured Streaming Data Pipeline
Last Updated on July 21, 2023 by Editorial Team
Author(s): Vivek Chaudhary
Originally published on Towards AI.
Programming
The objective of this article is to build an understanding to create a data pipeline to process data using Apache Structured Streaming and Apache Kafka.
Business Case Explanation:
Let us consider a store that generates Customer Invoices and those Invoices come to an integrated platform in a real-time fashion to inform the customer how much βEdge Rewardβ points they have earned on their shopping.
Invoices from various stores across the City New Delhi travel to Kafka Topic in a real-time fashion and based on total shopping amount, our Kafka consumer calculates the Edge Points and sends a notification message to the customer.
- Invoice Dataset
Invoice Data is generated in JSON format. For the ease of understanding, I have not created a nested dataset.
{βBillNoβ:β93647513",βCreatedTimeβ:1595688902254,βStoreIDβ:βSTR8510",βPaymentModeβ:βCARDβ,βCustomerCardNoβ:β5241-xxxx-xxxx-1142",βItemIDβ:β258",βItemDescriptionβ:βAlmirahβ,βItemPriceβ:1687.0,βItemQtyβ:1,βTotalValueβ:1687.0}
{βBillNoβ:β93647514",βCreatedTimeβ:1595688901055,βStoreIDβ:βSTR8511",βPaymentModeβ:βCARDβ,βCustomerCardNoβ:β5413-xxxx-xxxx-1142",βItemIDβ:β259",βItemDescriptionβ:βLEDβ,βItemPriceβ:1800.0,βItemQtyβ:2,βTotalValueβ:3900.0}
{βBillNoβ:β93647515",βCreatedTimeβ:1595688902258,βStoreIDβ:βSTR8512",βPaymentModeβ:βCARDβ,βCustomerCardNoβ:β5346-xxxx-xxxx-1142",βItemIDβ:β257",βItemDescriptionβ:βACβ,βItemPriceβ:2290.0,βItemQtyβ:2,βTotalValueβ:4580.0}
2. Data Producer:
Invoice Data is sent to Kafka topic: ADVICE
3. Data Consumer:
import findspark
findspark.init(ββ)
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayTypeprint(βmodules importedβ)spark= SparkSession \
.builder.appName(βKafka_read_mongoβ) \
.master(βlocalβ) \
.config(βspark.streaming.stopGracefullyOnShutdownβ, βtrueβ) \
.config(βspark.jars.packagesβ,βorg.apache.spark:spark-sql-kafka-0β10_2.11:2.3.3") \
.getOrCreate()schema = StructType([
StructField(βBillNoβ, StringType()),
StructField(βCreatedTimeβ, LongType()),
StructField(βStoreIDβ,StringType()),
StructField(βPaymentModeβ, StringType()),
StructField(βCustomerCardNoβ, StringType()),
StructField(βItemIDβ, StringType()),
StructField(βItemDescriptionβ, StringType()),
StructField(βItemPriceβ, DoubleType()),
StructField(βItemQtyβ, IntegerType()),
StructField(βTotalValueβ, DoubleType())])print(βNow time to connect to Kafka broker to read Invoice Dataβ)kafka_df = spark.readStream.format(βkafkaβ) \
.option(βkafka.bootstrap.serversβ, βlocalhost:9092β) \
.option(βsubscribeβ, βadviceβ) \
.option(βstartingOffsetsβ,βearliestβ) \
.load()print(βCreate Invoice DataFramesβ)
invoice_df=kafka_df.select(from_json(col(βvalueβ).cast(βstringβ), schema).alias(βvalueβ))print(βCreate Notification DFβ)notification_df = invoice_df.select(βvalue.BillNoβ, βvalue.CustomerCardNoβ,βvalue.TotalValueβ).
withColumn(βEarnedLoyaltyPointsβ, expr(βTotalValue * 0.2β))print(βCreate Final Dataframeβ)#Kafka accepts data in key-value format, below piece of code caters the samekafka_target_df = notification_df.selectExpr(βBillNo as keyβ,
βββto_json(named_struct(
βCustomerCardNoβ, CustomerCardNo,
βTotalValueβ, TotalValue,
βEarnedLoyaltyPointsβ, TotalValue * 0.2)) as valueβββ)################tbc##################
readStream()– Spark create Streaming Data Frame through the DataStreamReader interface returned by SparkSession.readStream()
outputMode():
The βOutputβ is defined as what gets written out to the external storage. The output can be defined in a different mode:
- Complete Mode β The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle the writing of the entire table.
- Append Mode β Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only to the queries where existing rows in the Result Table are not expected to change.
- Update Mode β Only the rows that were updated in the Result Table since the last trigger will be written to the external storage. Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesnβt contain aggregations, it will be equivalent to Append mode.
#Note: In our use case we are using Append Mode, as the requirement of the use case is such.
4. Sending Notification:
As we do not have the infrastructure to send text messages to customers, so this particular functionality is catered by sending a JSON message to another Kafka topic: NOTIFICATION , assuming that another downstream pipeline reads the messages from the topic and notify customers regarding the Earned Loyalty Points.
#################continuing#################print(βwriting to notification topicβ)#At this particular moment we are able to generate the notification message to be sent to consumer with necessary details such as CardNo, Total Shopping Amount and Reward points earned.#notification_writer_query = kafka_target_df \
.writeStream \
.queryName(βNotification Writerβ) \
.format(βkafkaβ) \
.option(βkafka.bootstrap.serversβ, βlocalhost:9092β) \
.option(βtopicβ, βnotificationβ) \
.outputMode(βappendβ) \
.start()notification_writer_query.awaitTermination()
Notification messages are available on the topic.
Here we are done with the approach to design a basic application to process Invoice received from stores, calculate Reward Points, and sending Notification to customers. I hope I can share my knowledge.
Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming aΒ sponsor.
Published via Towards AI