Quickstart
What is Feast?
Prerequisites
# create & activate a virtual environment python -m venv venv/ source venv/bin/activate
Overview
Step 1: Install Feast
pip install feastStep 2: Create a feature repository
feast init my_project
cd my_project/feature_repoCreating a new Feast repository in /home/Jovyan/my_project.project: my_project
# By default, the registry is a file (but can be turned into a more scalable SQL-backed registry)
registry: data/registry.db
# The provider primarily specifies default offline / online stores & storing the registry in a given cloud
provider: local
online_store:
type: sqlite
path: data/online_store.db
entity_key_serialization_version: 3# This is an example feature definition file
from datetime import timedelta
import pandas as pd
from feast import (
Entity,
FeatureService,
FeatureView,
Field,
FileSource,
Project,
PushSource,
RequestSource,
)
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64
# Define a project for the feature repo
project = Project(name="my_project", description="A project for driver statistics")
# Define an entity for the driver. You can think of an entity as a primary key used to
# fetch features.
driver = Entity(name="driver", join_keys=["driver_id"])
# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
# for more info.
driver_stats_source = FileSource(
name="driver_hourly_stats_source",
path="%PARQUET_PATH%",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
driver_stats_fv = FeatureView(
# The unique name of this feature view. Two feature views in a single
# project cannot have the same name
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
# The list of features defined below act as a schema to both define features
# for both materialization of features into a store, and are used as references
# during retrieval for building a training dataset or serving features
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64, description="Average daily trips"),
],
online=True,
source=driver_stats_source,
# Tags are user defined key/value pairs that are attached to each
# feature view
tags={"team": "driver_performance"},
)
# Define a request data source which encodes features / information only
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema=[
Field(name="val_to_add", dtype=Int64),
Field(name="val_to_add_2", dtype=Int64),
],
)
# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_stats_fv, input_request],
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df
# This groups features into a model version
driver_activity_v1 = FeatureService(
name="driver_activity_v1",
features=[
driver_stats_fv[["conv_rate"]], # Sub-selects a feature from a feature view
transformed_conv_rate, # Selects all features from the feature view
],
)
driver_activity_v2 = FeatureService(
name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate]
)
# Defines a way to push data (to be available offline, online or both) into Feast.
driver_stats_push_source = PushSource(
name="driver_stats_push_source",
batch_source=driver_stats_source,
)
# Defines a slightly modified version of the feature view from above, where the source
# has been changed to the push source. This allows fresh features to be directly pushed
# to the online store for this feature view.
driver_stats_fresh_fv = FeatureView(
name="driver_hourly_stats_fresh",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_push_source, # Changed from above
tags={"team": "driver_performance"},
)
# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df
driver_activity_v3 = FeatureService(
name="driver_activity_v3",
features=[driver_stats_fresh_fv, transformed_conv_rate_fresh],
)Inspecting the raw data
import pandas as pd
pd.read_parquet("data/driver_stats.parquet")
Step 3: Run sample workflow
Step 4: Register feature definitions and deploy your feature store
feast applyCreated entity driver
Created feature view driver_hourly_stats
Created feature view driver_hourly_stats_fresh
Created on demand feature view transformed_conv_rate
Created on demand feature view transformed_conv_rate_fresh
Created feature service driver_activity_v3
Created feature service driver_activity_v1
Created feature service driver_activity_v2
Created sqlite table my_project_driver_hourly_stats_fresh
Created sqlite table my_project_driver_hourly_statsStep 5: Generating training data or powering batch scoring models
Generating training data
from datetime import datetime
import pandas as pd
from feast import FeatureStore
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for
# more details on how to retrieve for all entities in the offline store instead
entity_df = pd.DataFrame.from_dict(
{
# entity's join key -> entity values
"driver_id": [1001, 1002, 1003],
# "event_timestamp" (reserved key) -> timestamps
"event_timestamp": [
datetime(2021, 4, 12, 10, 59, 42),
datetime(2021, 4, 12, 8, 12, 10),
datetime(2021, 4, 12, 16, 40, 26),
],
# (optional) label name -> label values. Feast does not process these
"label_driver_reported_satisfaction": [1, 5, 3],
# values we're using for an on-demand transformation
"val_to_add": [1, 2, 3],
"val_to_add_2": [10, 20, 30],
}
)
store = FeatureStore(repo_path=".")
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"transformed_conv_rate:conv_rate_plus_val1",
"transformed_conv_rate:conv_rate_plus_val2",
],
).to_df()
print("----- Feature schema -----\n")
print(training_df.info())
print()
print("----- Example features -----\n")
print(training_df.head())----- Feature schema -----
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 10 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 driver_id 3 non-null int64
1 event_timestamp 3 non-null datetime64[ns, UTC]
2 label_driver_reported_satisfaction 3 non-null int64
3 val_to_add 3 non-null int64
4 val_to_add_2 3 non-null int64
5 conv_rate 3 non-null float32
6 acc_rate 3 non-null float32
7 avg_daily_trips 3 non-null int32
8 conv_rate_plus_val1 3 non-null float64
9 conv_rate_plus_val2 3 non-null float64
dtypes: datetime64[ns, UTC](1), float32(2), float64(2), int32(1), int64(4)
memory usage: 336.0 bytes
None
----- Example features -----
driver_id event_timestamp label_driver_reported_satisfaction \
0 1001 2021-04-12 10:59:42+00:00 1
1 1002 2021-04-12 08:12:10+00:00 5
2 1003 2021-04-12 16:40:26+00:00 3
val_to_add val_to_add_2 conv_rate acc_rate avg_daily_trips \
0 1 10 0.800648 0.265174 643
1 2 20 0.644141 0.996602 765
2 3 30 0.855432 0.546345 954
conv_rate_plus_val1 conv_rate_plus_val2
0 1.800648 10.800648
1 2.644141 20.644141
2 3.855432 30.855432 Run offline inference (batch scoring)
entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"transformed_conv_rate:conv_rate_plus_val1",
"transformed_conv_rate:conv_rate_plus_val2",
],
).to_df()
print("\n----- Example features -----\n")
print(training_df.head())----- Example features -----
driver_id event_timestamp \
0 1001 2024-04-19 14:58:16.452895+00:00
1 1002 2024-04-19 14:58:16.452895+00:00
2 1003 2024-04-19 14:58:16.452895+00:00
label_driver_reported_satisfaction val_to_add val_to_add_2 conv_rate \
0 1 1 10 0.535773
1 5 2 20 0.171976
2 3 3 30 0.275669
acc_rate avg_daily_trips conv_rate_plus_val1 conv_rate_plus_val2
0 0.689705 428 1.535773 10.535773
1 0.737113 369 2.171976 20.171976
2 0.156630 116 3.275669 30.275669 Step 6: Ingest batch features into your online store
CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
feast materialize-incremental $CURRENT_TIME# Alternative: Materialize all data using current timestamp (for data without event timestamps)
feast materialize --disable-event-timestampMaterializing 2 feature views to 2024-04-19 10:59:58-04:00 into the sqlite online store.
driver_hourly_stats from 2024-04-18 15:00:46-04:00 to 2024-04-19 10:59:58-04:00:
100%|ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ| 5/5 [00:00<00:00, 370.32it/s]
driver_hourly_stats_fresh from 2024-04-18 15:00:46-04:00 to 2024-04-19 10:59:58-04:00:
100%|βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ| 5/5 [00:00<00:00, 1046.64it/s]
Materializing 2 feature views to 2024-04-19 10:59:58-04:00 into the sqlite online store.Step 7: Fetching feature vectors for inference
from pprint import pprint
from feast import FeatureStore
store = FeatureStore(repo_path=".")
feature_vector = store.get_online_features(
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
],
entity_rows=[
# {join_key: entity_value}
{"driver_id": 1004},
{"driver_id": 1005},
],
).to_dict()
pprint(feature_vector){
'acc_rate': [0.25351759791374207, 0.8949751853942871],
'avg_daily_trips': [712, 791],
'conv_rate': [0.5038306713104248, 0.9839504361152649],
'driver_id': [1004, 1005]
}Step 8: Using a feature service to fetch online features instead.
from feast import FeatureService
driver_stats_fs = FeatureService(
name="driver_activity_v1", features=[driver_stats_fv]
)from pprint import pprint
from feast import FeatureStore
feature_store = FeatureStore('.') # Initialize the feature store
feature_service = feature_store.get_feature_service("driver_activity_v1")
feature_vector = feature_store.get_online_features(
features=feature_service,
entity_rows=[
# {join_key: entity_value}
{"driver_id": 1004},
{"driver_id": 1005},
],
).to_dict()
pprint(feature_vector){
'acc_rate': [0.5732735991477966, 0.7828438878059387],
'avg_daily_trips': [33, 984],
'conv_rate': [0.15498852729797363, 0.6263588070869446],
'driver_id': [1004, 1005]
}Step 9: Browse your features with the Web UI (experimental)
feast uiINFO: Started server process [66664]
08/17/2022 01:25:49 PM uvicorn.error INFO: Started server process [66664]
INFO: Waiting for application startup.
08/17/2022 01:25:49 PM uvicorn.error INFO: Waiting for application startup.
INFO: Application startup complete.
08/17/2022 01:25:49 PM uvicorn.error INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8888 (Press CTRL+C to quit)
08/17/2022 01:25:49 PM uvicorn.error INFO: Uvicorn running on http://0.0.0.0:8888 (Press CTRL+C to quit)
Step 10: Re-examine test_workflow.py
test_workflow.pyNext steps
Last updated
Was this helpful?