feat: Offline Store historical features retrieval based on datetime range in dask#5717
feat: Offline Store historical features retrieval based on datetime range in dask#5717ntkathole merged 11 commits intofeast-dev:masterfrom
Conversation
…atatime range for Dask Signed-off-by: Aniket Paluskar <apaluska@redhat.com>
There was a problem hiding this comment.
Pull Request Overview
This PR adds non-entity mode historical feature retrieval to the Dask offline store, enabling users to retrieve features over a time range (start_date/end_date) without providing an entity_df.
Key changes:
- Makes
entity_dfoptional inDaskOfflineStore.get_historical_featuresand acceptsstart_date/end_datevia kwargs - Synthesizes a minimal one-row entity_df with only the event_timestamp column to reuse existing join and metadata logic
- Implements cross-join fallback when join keys are absent from the synthetic entity_df, relying on TTL filtering and deduplication for correctness
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| sdk/python/feast/infra/offline_stores/dask.py | Implements non-entity mode by making entity_df optional, synthesizing a minimal entity_df when None, and using cross-join logic when join keys are absent |
| sdk/python/tests/unit/infra/offline_stores/test_dask_non_entity.py | Adds unit test verifying that the API accepts start_date/end_date parameters in non-entity mode and returns a valid RetrievalJob |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # When start_date is not provided, choose a conservative lower bound using max TTL, otherwise fall back. | ||
| if start_date is None: | ||
| max_ttl_seconds = 0 | ||
| for fv in feature_views: | ||
| if fv.ttl and isinstance(fv.ttl, timedelta): | ||
| max_ttl_seconds = max( | ||
| max_ttl_seconds, int(fv.ttl.total_seconds()) | ||
| ) | ||
| if max_ttl_seconds > 0: | ||
| start_date = end_date - timedelta(seconds=max_ttl_seconds) | ||
| else: | ||
| # Keep default window bounded to avoid unbounded scans by default. | ||
| start_date = end_date - timedelta(days=30) | ||
|
|
There was a problem hiding this comment.
The start_date parameter is not enforced in the actual data filtering - only the end_date is used (along with TTL). The filtering logic in _filter_ttl will use end_date - ttl as the lower bound, not the user-provided start_date. This means if a user provides start_date that is later than end_date - ttl, they may get more data than expected.
Consider adding a validation check or warning when start_date is provided but will be overridden by TTL logic, or add a TODO comment indicating that proper start_date filtering should be implemented in a follow-up.
| # When start_date is not provided, choose a conservative lower bound using max TTL, otherwise fall back. | |
| if start_date is None: | |
| max_ttl_seconds = 0 | |
| for fv in feature_views: | |
| if fv.ttl and isinstance(fv.ttl, timedelta): | |
| max_ttl_seconds = max( | |
| max_ttl_seconds, int(fv.ttl.total_seconds()) | |
| ) | |
| if max_ttl_seconds > 0: | |
| start_date = end_date - timedelta(seconds=max_ttl_seconds) | |
| else: | |
| # Keep default window bounded to avoid unbounded scans by default. | |
| start_date = end_date - timedelta(days=30) | |
| # Compute TTL-based lower bound for start_date. | |
| max_ttl_seconds = 0 | |
| for fv in feature_views: | |
| if fv.ttl and isinstance(fv.ttl, timedelta): | |
| max_ttl_seconds = max( | |
| max_ttl_seconds, int(fv.ttl.total_seconds()) | |
| ) | |
| if max_ttl_seconds > 0: | |
| ttl_lower_bound = end_date - timedelta(seconds=max_ttl_seconds) | |
| else: | |
| # Keep default window bounded to avoid unbounded scans by default. | |
| ttl_lower_bound = end_date - timedelta(days=30) | |
| # If user provided start_date, use the max of user start_date and ttl_lower_bound. | |
| if start_date is not None: | |
| if start_date < ttl_lower_bound: | |
| import warnings | |
| warnings.warn( | |
| f"Provided start_date ({start_date}) is earlier than TTL-based lower bound ({ttl_lower_bound}). Overriding start_date to {ttl_lower_bound}." | |
| ) | |
| start_date = max(start_date, ttl_lower_bound) | |
| else: | |
| start_date = ttl_lower_bound |
|
|
||
| fv = _mock_feature_view() | ||
|
|
||
| # Expect this to work once non-entity mode is implemented for Dask-based store |
There was a problem hiding this comment.
The comment is outdated. Since the non-entity mode is now implemented (not just expected to work in the future), this comment should be updated to describe what the test actually validates.
Consider updating to something like:
# Verify that non-entity mode (entity_df=None) accepts start_date and end_date parameters| # Expect this to work once non-entity mode is implemented for Dask-based store | |
| # Verify that non-entity mode (entity_df=None) accepts start_date and end_date parameters |
| end_date=datetime(2023, 1, 2, tzinfo=timezone.utc), | ||
| ) | ||
|
|
||
| # When implemented, should return a RetrievalJob |
There was a problem hiding this comment.
The comment is outdated. Since the implementation is complete, this comment should be updated to describe what is being asserted.
Consider updating to something like:
# Should return a RetrievalJob instance| # When implemented, should return a RetrievalJob | |
| # Should return a RetrievalJob instance |
| # df_to_join = _merge(entity_df_with_features, df_to_join, join_keys) | ||
|
|
There was a problem hiding this comment.
Commented-out code should be removed. If this line is no longer needed due to the refactoring below, it should be deleted rather than commented out.
| # df_to_join = _merge(entity_df_with_features, df_to_join, join_keys) |
| entity_df, dd.DataFrame | ||
| ): | ||
| raise ValueError( | ||
| f"Please provide an entity_df of type {type(pd.DataFrame)} or dask.dataframe instead of type {type(entity_df)}" |
There was a problem hiding this comment.
The error message formatting is incorrect. The f-string should format pd.DataFrame as a string, not call type() on it. This will result in an error message like "...type <class 'type'> instead of type <class 'type'>".
Consider changing to:
f"Please provide an entity_df of type pd.DataFrame or dask.dataframe.DataFrame instead of type {type(entity_df)}"| f"Please provide an entity_df of type {type(pd.DataFrame)} or dask.dataframe instead of type {type(entity_df)}" | |
| f"Please provide an entity_df of type pd.DataFrame or dask.dataframe.DataFrame instead of type {type(entity_df)}" |
| if non_entity_mode and any( | ||
| k not in entity_df_with_features.columns for k in join_keys | ||
| ): |
There was a problem hiding this comment.
[nitpick] The logic for checking missing join keys could be simplified. In non-entity mode, the synthetic entity_df will never contain join keys (it only has the event_timestamp column). Therefore, the any() check is unnecessary overhead.
Consider simplifying to:
current_join_keys = join_keys
if non_entity_mode:
current_join_keys = []This makes the logic clearer and more efficient since we know join keys are never present in the synthetic entity_df.
| if non_entity_mode and any( | |
| k not in entity_df_with_features.columns for k in join_keys | |
| ): | |
| if non_entity_mode: |
|
|
||
| if non_entity_mode: | ||
| # Default end_date to current time (UTC) to keep behavior predictable without extra parameters. | ||
| end_date = end_date or datetime.now(timezone.utc) |
There was a problem hiding this comment.
| end_date = end_date or datetime.now(timezone.utc) | |
| end_date = make_tzaware(end_date) or datetime.now(timezone.utc) |
| end_date = end_date or datetime.now(timezone.utc) | ||
|
|
||
| # When start_date is not provided, choose a conservative lower bound using max TTL, otherwise fall back. | ||
| if start_date is None: |
There was a problem hiding this comment.
If start_date is given you have to make it tzaware ?
| # Minimal synthetic entity_df: one timestamp row; join keys are not materialized here on purpose to avoid | ||
| # accidental dependence on specific feature view schemas at this layer. | ||
| entity_df = pd.DataFrame( | ||
| {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL: [end_date]} |
There was a problem hiding this comment.
you havent given start date and tz. Both matters very much. It should be:
start=start_date, end=end_date, freq="1s", tz=timezone.utc
| entity_df, dd.DataFrame | ||
| ): | ||
| raise ValueError( | ||
| f"Please provide an entity_df of type {type(pd.DataFrame)} or dask.dataframe instead of type {type(entity_df)}" |
| if non_entity_mode and any( | ||
| k not in entity_df_with_features.columns for k in join_keys | ||
| ): |
| # When implemented, should return a RetrievalJob | ||
| from feast.infra.offline_stores.offline_store import RetrievalJob | ||
|
|
||
| assert isinstance(retrieval_job, RetrievalJob) |
There was a problem hiding this comment.
I dont think this is good enough to validate the data based retrieval
Signed-off-by: Aniket Paluskar <apaluska@redhat.com>
…ut using datetime Signed-off-by: Aniket Paluskar <apaluska@redhat.com>
# [0.58.0](v0.57.0...v0.58.0) (2025-12-16) ### Bug Fixes * Add java proto ([#5719](#5719)) ([fc3ea20](fc3ea20)) * Add possibility to force full features names for materialize ops ([#5728](#5728)) ([55c9c36](55c9c36)) * Fixed file registry cache sync ([09505d4](09505d4)) * Handle hyphon in sqlite project name ([#5575](#5575)) ([#5749](#5749)) ([b8346ff](b8346ff)) * Pinned substrait to fix protobuf issue ([d0ef4da](d0ef4da)) * Set TLS certificate annotation only on gRPC service ([#5715](#5715)) ([75d13db](75d13db)) * SQLite online store deletes tables from other projects in shared registry scenarios ([#5766](#5766)) ([fabce76](fabce76)) * Validate not existing entity join keys for preventing panic ([0b93559](0b93559)) ### Features * Add annotations for pod templates ([534e647](534e647)) * Add Pytorch template ([#5780](#5780)) ([6afd353](6afd353)) * Add support for extra options for stream source ([#5618](#5618)) ([18956c2](18956c2)) * Added matched_tag field search api results with fuzzy search capabilities ([#5769](#5769)) ([4a9ffae](4a9ffae)) * Added support for enabling metrics in Feast Operator ([#5317](#5317)) ([#5748](#5748)) ([a8498c2](a8498c2)) * Configure CacheTTLSecondscache,CacheMode for file-based registry in Feast Operator([#5708](#5708)) ([#5744](#5744)) ([f25f83b](f25f83b)) * Implemented Tiling Support for Time-Windowed Aggregations ([#5724](#5724)) ([7a99166](7a99166)) * Offline Store historical features retrieval based on datetime range for spark ([#5720](#5720)) ([27ec8ec](27ec8ec)) * Offline Store historical features retrieval based on datetime range in dask ([#5717](#5717)) ([a16582a](a16582a)) * Production ready feast operator with v1 apiversion ([#5771](#5771)) ([49359c6](49359c6)) * Support for Map value data type ([#5768](#5768)) ([#5772](#5772)) ([b99a8a9](b99a8a9))
# [0.58.0](feast-dev/feast@v0.57.0...v0.58.0) (2025-12-16) ### Bug Fixes * Add java proto ([feast-dev#5719](feast-dev#5719)) ([fc3ea20](feast-dev@fc3ea20)) * Add possibility to force full features names for materialize ops ([feast-dev#5728](feast-dev#5728)) ([55c9c36](feast-dev@55c9c36)) * Fixed file registry cache sync ([09505d4](feast-dev@09505d4)) * Handle hyphon in sqlite project name ([feast-dev#5575](feast-dev#5575)) ([feast-dev#5749](feast-dev#5749)) ([b8346ff](feast-dev@b8346ff)) * Pinned substrait to fix protobuf issue ([d0ef4da](feast-dev@d0ef4da)) * Set TLS certificate annotation only on gRPC service ([feast-dev#5715](feast-dev#5715)) ([75d13db](feast-dev@75d13db)) * SQLite online store deletes tables from other projects in shared registry scenarios ([feast-dev#5766](feast-dev#5766)) ([fabce76](feast-dev@fabce76)) * Validate not existing entity join keys for preventing panic ([0b93559](feast-dev@0b93559)) ### Features * Add annotations for pod templates ([534e647](feast-dev@534e647)) * Add Pytorch template ([feast-dev#5780](feast-dev#5780)) ([6afd353](feast-dev@6afd353)) * Add support for extra options for stream source ([feast-dev#5618](feast-dev#5618)) ([18956c2](feast-dev@18956c2)) * Added matched_tag field search api results with fuzzy search capabilities ([feast-dev#5769](feast-dev#5769)) ([4a9ffae](feast-dev@4a9ffae)) * Added support for enabling metrics in Feast Operator ([feast-dev#5317](feast-dev#5317)) ([feast-dev#5748](feast-dev#5748)) ([a8498c2](feast-dev@a8498c2)) * Configure CacheTTLSecondscache,CacheMode for file-based registry in Feast Operator([feast-dev#5708](feast-dev#5708)) ([feast-dev#5744](feast-dev#5744)) ([f25f83b](feast-dev@f25f83b)) * Implemented Tiling Support for Time-Windowed Aggregations ([feast-dev#5724](feast-dev#5724)) ([7a99166](feast-dev@7a99166)) * Offline Store historical features retrieval based on datetime range for spark ([feast-dev#5720](feast-dev#5720)) ([27ec8ec](feast-dev@27ec8ec)) * Offline Store historical features retrieval based on datetime range in dask ([feast-dev#5717](feast-dev#5717)) ([a16582a](feast-dev@a16582a)) * Production ready feast operator with v1 apiversion ([feast-dev#5771](feast-dev#5771)) ([49359c6](feast-dev@49359c6)) * Support for Map value data type ([feast-dev#5768](feast-dev#5768)) ([feast-dev#5772](feast-dev#5772)) ([b99a8a9](feast-dev@b99a8a9)) Signed-off-by: Anthonette Adanyin <106275232+antznette1@users.noreply.github.com>
# [0.58.0](v0.57.0...v0.58.0) (2025-12-16) ### Bug Fixes * Add java proto ([#5719](#5719)) ([fc3ea20](fc3ea20)) * Add possibility to force full features names for materialize ops ([#5728](#5728)) ([55c9c36](55c9c36)) * Fixed file registry cache sync ([09505d4](09505d4)) * Handle hyphon in sqlite project name ([#5575](#5575)) ([#5749](#5749)) ([b8346ff](b8346ff)) * Pinned substrait to fix protobuf issue ([d0ef4da](d0ef4da)) * Set TLS certificate annotation only on gRPC service ([#5715](#5715)) ([75d13db](75d13db)) * SQLite online store deletes tables from other projects in shared registry scenarios ([#5766](#5766)) ([fabce76](fabce76)) * Validate not existing entity join keys for preventing panic ([0b93559](0b93559)) ### Features * Add annotations for pod templates ([534e647](534e647)) * Add Pytorch template ([#5780](#5780)) ([6afd353](6afd353)) * Add support for extra options for stream source ([#5618](#5618)) ([18956c2](18956c2)) * Added matched_tag field search api results with fuzzy search capabilities ([#5769](#5769)) ([4a9ffae](4a9ffae)) * Added support for enabling metrics in Feast Operator ([#5317](#5317)) ([#5748](#5748)) ([a8498c2](a8498c2)) * Configure CacheTTLSecondscache,CacheMode for file-based registry in Feast Operator([#5708](#5708)) ([#5744](#5744)) ([f25f83b](f25f83b)) * Implemented Tiling Support for Time-Windowed Aggregations ([#5724](#5724)) ([7a99166](7a99166)) * Offline Store historical features retrieval based on datetime range for spark ([#5720](#5720)) ([27ec8ec](27ec8ec)) * Offline Store historical features retrieval based on datetime range in dask ([#5717](#5717)) ([a16582a](a16582a)) * Production ready feast operator with v1 apiversion ([#5771](#5771)) ([49359c6](49359c6)) * Support for Map value data type ([#5768](#5768)) ([#5772](#5772)) ([b99a8a9](b99a8a9)) Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
What this PR does / why we need it:
Adds start/end-only historical retrieval to Dask offline store, enabling users to fetch features over a time range without providing an entity_df.
Makes entity_df optional in DaskOfflineStore.get_historical_features and accepts start_date/end_date via kwargs.
In non-entity mode:
Which issue(s) this PR fixes:
RHOAIENG-37451
Misc