Master LLMs with our FREE course in collaboration with Activeloop & Intel Disruptor Initiative. Join now!

Publication

Volga — Open-source Feature Engine for real-time AI — Part 2
Artificial Intelligence   Data Engineering   Latest   Machine Learning

Volga — Open-source Feature Engine for real-time AI — Part 2

Last Updated on April 7, 2024 by Editorial Team

Author(s): Andrey Novitskiy

Originally published on Towards AI.

This is the second part of a 2-post series describing Volga’s architecture and technical details. For motivation and the problem’s background, see the first part.

Volga river

TL;DR

Volga is an open-source real-time feature engine providing a unified compute layer, data models and consistent API for different types of features: Online, Offline, On-Demand (see previous post).

It is built on top of Ray and aims to remove dependency on complex multi-part computation layers in real-time ML systems (Spark+Flink) and/or managed Feature Platforms (Tecton.ai, Fennel.ai).

Content:

  • Architecture Overview
  • Sample Pipelines
  • On-Demand Features
  • Scalability
  • Deployment
  • Current State and Future Work

Architecture overview

Volga’s Architecture

Volga’s main goal is to provide a consistent unified computation layer for all feature types described in the previous post (Online, Offline and On-Demand). To achieve this, it is built on top of Ray’s Distributed Runtime and uses Ray Actors as main execution units. Let’s look at the moving parts of the system:

  • Client — The entry point of any job/request. Users define online and offline feature calculation pipelines using declarative syntax of Operators: transform, filter, join, groupby/aggregate, drop (see examples below). Client is responsible for validating user’s online/offline pipeline definitions and compiling them into ExecutionGraph, On-Demand feature definitions into OnDemandDAG.
  • Streaming Workers — Ray Actors executing streaming operators based on ExecutionGraph from client. Volga is built using Kappa architecture so stream workers perform calculation for both online (unbounded streams) and offline (bounded streams) features. Calculation is performed on Write Path (asynchronously to user request). Message exchange between workers is done via ZeroMQ.
  • On-Demand Workers — Ray Actors executing OnDemandDAG of on-demand tasks. Performed on Read Path (synchronously to user request), result is immediately returned to a user.
  • HotStorage and ColdStorage — Volga expects users to implement two types of interfaces to store processed data: ColdStorage for offline setting (e.g. MySQL, data lakes, etc.) and HotStorage for real-time access (e.g. Redis, Cassandra, RocksDB). It comes with a built-in SimpleInMemoryActorStorage which implements both interfaces and can be used for local development/prototyping.

The graph above demonstrates a high level execution/data flow for each feature type:

Online (red), Offline (blue), On-Demand (green).

Sample Pipelines

Let’s assume you have a simple order processing system (for a marketplace?), where user information is stored in MySQL table ‘users’ and an order (purchase) stream represented as a Kafka topic ‘orders’ .

Let’s build a pipeline where we want to calculate aggregated values in certain time periods over an order stream for each user (e.g. number of purchases on sale in the last hour), which we can later use as features either for training an ML model or for real-time inference.

Define general sources first:

from volga.sources import KafkaSource, MysqlSource

kafka = KafkaSource.get(bootstrap_servers='127.0.0.1:9092', username='root', password='')
mysql = MysqlSource.get(host='127.0.0.1', port='3306', user='root', password='', database='db')

Define input data schemas using @dataset decorator and map it to a source using @source decorator:

from volga.datasets import dataset, field, pipeline
from volga.sources import source

@source(mysql.table('users'))
@dataset
class User:
user_id: str = field(key=True)
registered_at: datetime.datetime = field(timestamp=True)
name: str


@source(kafka.topic('orders'), tag='online')
@source(mysql.table('orders'), tag='offline')
@dataset
class Order:
buyer_id: str = field(key=True)
product_id: str = field(key=True)
product_type: str # 'ON_SALE' or 'REGULAR'
purchased_at: datetime.datetime = field(timestamp=True)
product_price: float

Volga assumes that each source contains timestamped values with the same schema, e.g. for MySQL table each row should represent a user, for Kafka topic each message is a purchase event. The timestamp field should be annotated with timestamp=True .

We also annotate certain fields with key=True, this is used to later retrieve/lookup values associated with these keys, perform joins and aggregates. Note that we can reuse schemas for different sources by using tags (e.g. online source and offline source).

The last step is to define an output data shape and a function that will take our input datasets (User and Order) and output resulting dataset (this function is annotated via @pipeline decorator):

from volga.pipeline import pipeline

@dataset
class OnSaleUserSpentInfo:
user_id: str = field(key=True)
product_id: str = field(key=True)
timestamp: datetime.datetime = field(timestamp=True)

avg_spent_7d: float
avg_spent_1h: float
num_purchases_1d: int

@pipeline(inputs=[User, Order])
def gen(cls, users: Dataset, orders: Dataset):
on_sale_purchases = orders.filter(lambda df: df['product_type'] == 'ON_SALE')
per_user = on_sale_purchases.join(users, right_on=['user_id'], left_on=['buyer_id'])

return per_user.group_by(keys=['user_id']).aggregate([
Avg(on='product_price', window= '7d', into='avg_spent_7d'),
Avg(on='product_price', window= '1h', into='avg_spent_1h'),
Count(window='1d', into='num_purchases_1d'),
])

The code is self-explanatory, we first filter orders to get on sale purchases, then join it with users on user_id and perform sliding window aggregate for each unique user_id.

Now as we have all definitions in place, let’s run feature materialization (calculation) jobs. Define Client and Storage:

from volga import Client
from volga.storage.common.simple_in_memory_actor_storage import SimpleInMemoryActorStorage

storage = SimpleInMemoryActorStorage()
client = Client(hot=storage, cold=storage)

Volga uses Storage abstraction to interface with user’s storage infra. It discriminates between two interfaces: ColdStorage for offline materialization and HotStorage for online materialization. Volga comes with a built-in SimpleInMemoryActorStorage, which represent a Ray Actor which stores data in-memory and can be used as both cold and hot storage, which is a good option for local development.

Launch offline materialization job. Based on user’s pipeline and defined parallelism or scaling config, this step will build a computation graph, start worker actors with corresponding operators, run computation on bounded streams and put data into cold storage. Note that it will use MySQL source (tagged offline) for Order dataset:

client.materialize_offline(
target=OnSaleUserSpentInfo,
source_tags={Order: 'offline'}
)

Getting offline data (e.g. for model training):

historical_on_sale_user_spent_df = client.get_offline_data(
dataset_name=OnSaleUserSpentInfo.__name__,
keys={'user_id': 0},
start=None, end=None # whole dataset
)

historical_on_sale_user_spent_df
...
user_id product_id timestamp avg_spent_7d avg_spent_1h num_purchases_1d
0 0 prod_0 2024-03-27 11:26:45.514375 100 100 1
1 0 prod_2 2024-03-27 12:24:45.514375 100 100 2
2 0 prod_4 2024-03-27 13:22:45.514375 100 100 3
3 0 prod_6 2024-03-27 14:20:45.514375 100 100 4
4 0 prod_8 2024-03-27 15:18:45.514375 100 100 5

Similarly, running online materialization will produce similar steps, except the engine will run unbounded streams, use Kafka source for Order dataset and sink resulting data into HotStorage:

client.materialize_online(
target=OnSaleUserSpentInfo,
source_tags={Order: 'online'},
_async=True
)

Querying it looks something like this:

live_on_sale_user_spent = None
while True:
res = client.get_online_latest_data(
dataset_name=OnSaleUserSpentInfo.__name__,
keys={'user_id': 0}
)
if live_on_sale_user_spent == res:
# skip same event
continue
live_on_sale_user_spent = res
print(f'[{time.time()}]{res}')

...
[1711537166.856853][{'user_id': '0', 'product_id': 'prod_0', 'timestamp': '2024-03-27 14:59:20.124752', 'avg_spent_7d': 100, 'avg_spent_1h': 100, 'num_purchases_1d': 1}]
[1711537167.867083][{'user_id': '0', 'product_id': 'prod_2', 'timestamp': '2024-03-27 15:57:20.124752', 'avg_spent_7d': 100, 'avg_spent_1h': 100, 'num_purchases_1d': 2}]
[1711537169.8647628][{'user_id': '0', 'product_id': 'prod_4', 'timestamp': '2024-03-27 16:55:20.124752', 'avg_spent_7d': 100, 'avg_spent_1h': 100, 'num_purchases_1d': 3}]

On-Demand Features (Experimental work in progress)

On-Demand features allow performing stateless transformations at user request time, both in online and offline setting. This can be helpful in cases when transformation is too performance-heavy for streaming or when input data is available only at inference request time (e.g. GPS coordinates, meta-model outputs, etc.).

Define resulting @dataset with @on_demand function. On-Demand features can depend on regular datasets (with @pipeline function) as well as other on-demand datasets – this is configured via deps=[Dataset] parameter. When materialization is launched (both offline and online), the framework builds on-demand task DAG and executes it in parallel on on-demand worker pool (Ray Actors).

from volga.on_demand import on_demand

@dataset
class UserOnSaleTransactionTooBig:
user_id: str = field(key=True)
tx_id: str = field(key=True)
tx_ts: datetime.datetime = field(timestamp=True)

above_7d_avg: bool
above_1h_avg: bool

@on_demand(deps=[OnSaleUserSpentInfo])
def gen(cls, ts: datetime.datetime, transaction: Dict):
lookup_keys = [{'user_id': transaction['user_id'], {'product_id': transaction['product_id']}]
on_sale_spent_info = OnSaleUserSpentInfo.get(keys=lookup_keys, ts=ts) # returns Dict-like object
above_7d_avg = transaction['tx_amount'] > on_sale_spent_info['avg_spent_7d']
above_1h_avg = transaction['tx_amount'] > on_sale_spent_info['avg_spent_1h']

# output schema should match dataset schema
return {
'user_id': transaction['user_id'],
'tx_id': transaction['tx_id'],
'tx_ts': ts,
'above_7d_avg': above_7d_avg,
'above_1h_avg': above_1h_avg
}

Calculate on-demand transformation, this can be done for both online and offline storages. This step will first check availability of all dependant datasets and will fail if not present. Note that unlike online/offline features, on-demand are calculated on Read Path, we don’t need to separately query storage layer for results:

res = client.get_on_demand(
target=UserOnSaleTransactionTooBig,
online=True, # False for offline storage source
start = None, end = None, # datetime range in case of offline request
inputs=[{
'user_id': '1',
'product_id': 'prod_0',
'tx_id': 'tx_0',
'tx_amount': 150
}]
)
...

{'user_id': '1', 'timestamp': '2024-03-27 14:59:20.124752', 'tx_id': 'tx_0', 'above_7d_avg': 'false', 'above_1h_avg': 'true'}

Scalability

Volga’s Operators have a notion of parallelism (similar to Flink’s parallelism) — As an example, consider a simple Order.transform(…) pipeline from previous example. Compiling this pipeline into a JobGraph will create the following structure:

SourceOperator → MapOperator → SinkOperator

By default, when JobGraph is translated into ExecutionGraph, if all operators have parallelism of 1, ExecutionGraph will have the same 3 node structure. Setting parallelism to 10 will result in an ExecutionGraph consisting of 10 copies of the above 3 node structure, creating 10 similar pipelines (note that it is up to a user to implement partition/storage logic for sources and sinks based on operator index in a given graph). This works for both offline (bounded streams) and online (unbounded streams) features

Users can set job’s parallelism in two ways:

  • Use parallelism parameter when running materialiaztion. This will set same parallelism for all operators in a job
client.materialize_offline(parallelism=10, ...)
  • Use ScalingConfig. This allows fine-grained parallelism setting on per-operator basis
client.materialize_online(scaling_config={'Join_1': 4}, ...)

Volga also allows setting resource requirements (such as CPU shares and heap memory) via ResourceConfig. This can be done globally, per-job or per-operator.

Running locally and deploying to remote clusters

Volga runs wherever Ray runs, meaning you can launch it locally, on a Kubernetes cluster with KubeRay, on bare AWS EC2 instances or on managed Anyscale.

For local env, just run ray start —head and run your pipeline. See Volga repo for more examples.

Current state and future work

Volga is currently in a proof of concept/prototype state; The high level architecture has been validated and tested, the main moving parts of streaming engine are in place and have been tested locally, the user facing API and data models are done. It is, however, is still a work in progress and misses some key streaming features (mainly around fault tolerance, message delivery guarantees and performance improvements), fully working on-demand feature implementation, some data connectors implementations and load testing/scalability testing.

You can see the whole backlog here:

Volga * anovv

GitHub is where people build software. More than 100 million people use GitHub to discover, fork, and contribute to…

github.com

Thank you for reading and please share, star the project on github and most importantly, leave your feedback as it is the main motivation behind this effort.

Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.

Published via Towards AI

Feedback ↓