148148)
149149
150150
151+ feast_metadata = Table (
152+ "feast_metadata" ,
153+ metadata ,
154+ Column ("metadata_key" , String (50 ), primary_key = True ),
155+ Column ("metadata_value" , String (50 ), nullable = False ),
156+ Column ("last_updated_timestamp" , BigInteger , nullable = False ),
157+ )
158+
159+
151160class SqlRegistry (BaseRegistry ):
152161 def __init__ (
153162 self , registry_config : Optional [RegistryConfig ], repo_path : Optional [Path ]
@@ -575,7 +584,6 @@ def get_user_metadata(
575584 def proto (self ) -> RegistryProto :
576585 r = RegistryProto ()
577586 project = ""
578- # TODO(achal): Support Infra object, and last_updated_timestamp.
579587 for lister , registry_proto_field in [
580588 (self .list_entities , r .entities ),
581589 (self .list_feature_views , r .feature_views ),
@@ -591,6 +599,11 @@ def proto(self) -> RegistryProto:
591599 if objs :
592600 registry_proto_field .extend ([obj .to_proto () for obj in objs ])
593601
602+ r .infra .CopyFrom (self .get_infra (project ).to_proto ())
603+ last_updated_timestamp = self ._get_last_updated_metadata ()
604+ if last_updated_timestamp :
605+ r .last_updated .FromDatetime (last_updated_timestamp )
606+
594607 return r
595608
596609 def commit (self ):
@@ -626,13 +639,15 @@ def _apply_object(self, table, id_field_name, obj, proto_field_name, name=None):
626639 }
627640 insert_stmt = insert (table ).values (values ,)
628641 conn .execute (insert_stmt )
642+ self ._set_last_updated_metadata (update_datetime )
629643
630644 def _delete_object (self , table , name , project , id_field_name , not_found_exception ):
631645 with self .engine .connect () as conn :
632646 stmt = delete (table ).where (getattr (table .c , id_field_name ) == name )
633647 rows = conn .execute (stmt )
634648 if rows .rowcount < 1 and not_found_exception :
635649 raise not_found_exception (name , project )
650+ self ._set_last_updated_metadata (datetime .utcnow ())
636651 return rows .rowcount
637652
638653 def _get_object (
@@ -666,3 +681,40 @@ def _list_objects(self, table, proto_class, python_class, proto_field_name):
666681 for row in rows
667682 ]
668683 return []
684+
685+ def _set_last_updated_metadata (self , last_updated : datetime ):
686+ with self .engine .connect () as conn :
687+ stmt = select (feast_metadata ).where (
688+ feast_metadata .c .metadata_key == "last_updated_timestamp"
689+ )
690+ row = conn .execute (stmt ).first ()
691+
692+ update_time = int (last_updated .timestamp ())
693+
694+ values = {
695+ "metadata_key" : "last_updated_timestamp" ,
696+ "metadata_value" : f"{ update_time } " ,
697+ "last_updated_timestamp" : update_time ,
698+ }
699+ if row :
700+ update_stmt = (
701+ update (feast_metadata )
702+ .where (feast_metadata .c .metadata_key == "last_updated_timestamp" )
703+ .values (values )
704+ )
705+ conn .execute (update_stmt )
706+ else :
707+ insert_stmt = insert (feast_metadata ).values (values ,)
708+ conn .execute (insert_stmt )
709+
710+ def _get_last_updated_metadata (self ):
711+ with self .engine .connect () as conn :
712+ stmt = select (feast_metadata ).where (
713+ feast_metadata .c .metadata_key == "last_updated_timestamp"
714+ )
715+ row = conn .execute (stmt ).first ()
716+ if not row :
717+ return None
718+ update_time = int (row ["last_updated_timestamp" ])
719+
720+ return datetime .utcfromtimestamp (update_time )
0 commit comments