Name: Towards AI Legal Name: Towards AI, Inc. Description: Towards AI is the world's leading artificial intelligence (AI) and technology publication. Read by thought-leaders and decision-makers around the world. Phone Number: +1-650-246-9381 Email: [email protected]
228 Park Avenue South New York, NY 10003 United States
Website: Publisher: https://towardsai.net/#publisher Diversity Policy: https://towardsai.net/about Ethics Policy: https://towardsai.net/about Masthead: https://towardsai.net/about
Name: Towards AI Legal Name: Towards AI, Inc. Description: Towards AI is the world's leading artificial intelligence (AI) and technology publication. Founders: Roberto Iriondo, , Job Title: Co-founder and Advisor Works for: Towards AI, Inc. Follow Roberto: X, LinkedIn, GitHub, Google Scholar, Towards AI Profile, Medium, ML@CMU, FreeCodeCamp, Crunchbase, Bloomberg, Roberto Iriondo, Generative AI Lab, Generative AI Lab Denis Piffaretti, Job Title: Co-founder Works for: Towards AI, Inc. Louie Peters, Job Title: Co-founder Works for: Towards AI, Inc. Louis-François Bouchard, Job Title: Co-founder Works for: Towards AI, Inc. Cover:
Towards AI Cover
Logo:
Towards AI Logo
Areas Served: Worldwide Alternate Name: Towards AI, Inc. Alternate Name: Towards AI Co. Alternate Name: towards ai Alternate Name: towardsai Alternate Name: towards.ai Alternate Name: tai Alternate Name: toward ai Alternate Name: toward.ai Alternate Name: Towards AI, Inc. Alternate Name: towardsai.net Alternate Name: pub.towardsai.net
5 stars – based on 497 reviews

Frequently Used, Contextual References

TODO: Remember to copy unique IDs whenever it needs used. i.e., URL: 304b2e42315e

Resources

Take the GenAI Test: 25 Questions, 6 Topics. Free from Activeloop & Towards AI

Publication

Pyspark Kafka Structured Streaming Data Pipeline
Latest   Machine Learning

Pyspark Kafka Structured Streaming Data Pipeline

Last Updated on July 21, 2023 by Editorial Team

Author(s): Vivek Chaudhary

Originally published on Towards AI.

Programming

The objective of this article is to build an understanding to create a data pipeline to process data using Apache Structured Streaming and Apache Kafka.

Source: Kafka-Spark streaming

Business Case Explanation:

Let us consider a store that generates Customer Invoices and those Invoices come to an integrated platform in a real-time fashion to inform the customer how much β€œEdge Reward” points they have earned on their shopping.

Invoices from various stores across the City New Delhi travel to Kafka Topic in a real-time fashion and based on total shopping amount, our Kafka consumer calculates the Edge Points and sends a notification message to the customer.

  1. Invoice Dataset

Invoice Data is generated in JSON format. For the ease of understanding, I have not created a nested dataset.

{β€œBillNo”:”93647513",”CreatedTime”:1595688902254,”StoreID”:”STR8510",”PaymentMode”:”CARD”,”CustomerCardNo”:”5241-xxxx-xxxx-1142",”ItemID”:”258",”ItemDescription”:”Almirah”,”ItemPrice”:1687.0,”ItemQty”:1,”TotalValue”:1687.0}
{β€œBillNo”:”93647514",”CreatedTime”:1595688901055,”StoreID”:”STR8511",”PaymentMode”:”CARD”,”CustomerCardNo”:”5413-xxxx-xxxx-1142",”ItemID”:”259",”ItemDescription”:”LED”,”ItemPrice”:1800.0,”ItemQty”:2,”TotalValue”:3900.0}
{β€œBillNo”:”93647515",”CreatedTime”:1595688902258,”StoreID”:”STR8512",”PaymentMode”:”CARD”,”CustomerCardNo”:”5346-xxxx-xxxx-1142",”ItemID”:”257",”ItemDescription”:”AC”,”ItemPrice”:2290.0,”ItemQty”:2,”TotalValue”:4580.0}

2. Data Producer:

Invoice Data is sent to Kafka topic: ADVICE

Advice systems Kafka

3. Data Consumer:

import findspark
findspark.init(β€˜β€™)
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType
print(β€˜modules imported’)spark= SparkSession \
.builder.appName(β€˜Kafka_read_mongo’) \
.master(β€œlocal”) \
.config(β€œspark.streaming.stopGracefullyOnShutdown”, β€œtrue”) \
.config(β€œspark.jars.packages”,”org.apache.spark:spark-sql-kafka-0–10_2.11:2.3.3") \
.getOrCreate()
schema = StructType([
StructField(β€œBillNo”, StringType()),
StructField(β€œCreatedTime”, LongType()),
StructField(β€œStoreID”,StringType()),
StructField(β€œPaymentMode”, StringType()),
StructField(β€œCustomerCardNo”, StringType()),
StructField(β€œItemID”, StringType()),
StructField(β€œItemDescription”, StringType()),
StructField(β€œItemPrice”, DoubleType()),
StructField(β€œItemQty”, IntegerType()),
StructField(β€œTotalValue”, DoubleType())])
print(β€˜Now time to connect to Kafka broker to read Invoice Data’)kafka_df = spark.readStream.format(β€œkafka”) \
.option(β€œkafka.bootstrap.servers”, β€œlocalhost:9092”) \
.option(β€œsubscribe”, β€œadvice”) \
.option(β€˜startingOffsets’,’earliest’) \
.load()
print(β€˜Create Invoice DataFrames’)
invoice_df=kafka_df.select(from_json(col(β€œvalue”).cast(β€œstring”), schema).alias(β€œvalue”))
print(β€˜Create Notification DF’)notification_df = invoice_df.select(β€œvalue.BillNo”, β€œvalue.CustomerCardNo”,β€œvalue.TotalValue”).
withColumn(β€œEarnedLoyaltyPoints”, expr(β€œTotalValue * 0.2”))
print(β€˜Create Final Dataframe’)#Kafka accepts data in key-value format, below piece of code caters the samekafka_target_df = notification_df.selectExpr(β€œBillNo as key”,
β€œβ€β€to_json(named_struct(
β€˜CustomerCardNo’, CustomerCardNo,
β€˜TotalValue’, TotalValue,
β€˜EarnedLoyaltyPoints’, TotalValue * 0.2)) as value”””)
################tbc##################

readStream()– Spark create Streaming Data Frame through the DataStreamReader interface returned by SparkSession.readStream()

outputMode():

The β€œOutput” is defined as what gets written out to the external storage. The output can be defined in a different mode:

  • Complete Mode β€” The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle the writing of the entire table.
  • Append Mode β€” Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only to the queries where existing rows in the Result Table are not expected to change.
  • Update Mode β€” Only the rows that were updated in the Result Table since the last trigger will be written to the external storage. Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

#Note: In our use case we are using Append Mode, as the requirement of the use case is such.

4. Sending Notification:

As we do not have the infrastructure to send text messages to customers, so this particular functionality is catered by sending a JSON message to another Kafka topic: NOTIFICATION , assuming that another downstream pipeline reads the messages from the topic and notify customers regarding the Earned Loyalty Points.

#################continuing#################print(β€˜writing to notification topic’)#At this particular moment we are able to generate the notification message to be sent to consumer with necessary details such as CardNo, Total Shopping Amount and Reward points earned.#notification_writer_query = kafka_target_df \
.writeStream \
.queryName(β€œNotification Writer”) \
.format(β€œkafka”) \
.option(β€œkafka.bootstrap.servers”, β€œlocalhost:9092”) \
.option(β€œtopic”, β€œnotification”) \
.outputMode(β€œappend”) \
.start()
notification_writer_query.awaitTermination()
Notification messages

Notification messages are available on the topic.

Here we are done with the approach to design a basic application to process Invoice received from stores, calculate Reward Points, and sending Notification to customers. I hope I can share my knowledge.

Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming aΒ sponsor.

Published via Towards AI

Feedback ↓