@@ -1018,7 +1018,7 @@ def _apply_object(
10181018 self .get_project (name = project , allow_cache = False ), commit = True
10191019 )
10201020 if not self .purge_feast_metadata :
1021- self ._set_last_updated_metadata (update_datetime , project )
1021+ self ._set_last_updated_metadata (update_datetime , project , conn )
10221022
10231023 def _maybe_init_project_metadata (self , project ):
10241024 # Initialize project metadata if needed
@@ -1060,7 +1060,7 @@ def _delete_object(
10601060 self .get_project (name = project , allow_cache = False ), commit = True
10611061 )
10621062 if not self .purge_feast_metadata :
1063- self ._set_last_updated_metadata (_utc_now (), project )
1063+ self ._set_last_updated_metadata (_utc_now (), project , conn )
10641064
10651065 return rows .rowcount
10661066
@@ -1111,39 +1111,45 @@ def _list_objects(
11111111 return objects
11121112 return []
11131113
1114- def _set_last_updated_metadata (self , last_updated : datetime , project : str ):
1115- with self .write_engine .begin () as conn :
1116- stmt = select (feast_metadata ).where (
1117- feast_metadata .c .metadata_key
1118- == FeastMetadataKeys .LAST_UPDATED_TIMESTAMP .value ,
1119- feast_metadata .c .project_id == project ,
1120- )
1121- row = conn .execute (stmt ).first ()
1122-
1123- update_time = int (last_updated .timestamp ())
1114+ def _set_last_updated_metadata (
1115+ self , last_updated : datetime , project : str , conn = None
1116+ ):
1117+ if conn is None :
1118+ with self .write_engine .begin () as conn :
1119+ self ._set_last_updated_metadata (last_updated , project , conn )
1120+ return
11241121
1125- values = {
1126- "metadata_key" : FeastMetadataKeys .LAST_UPDATED_TIMESTAMP .value ,
1127- "metadata_value" : f"{ update_time } " ,
1128- "last_updated_timestamp" : update_time ,
1129- "project_id" : project ,
1130- }
1131- if row :
1132- update_stmt = (
1133- update (feast_metadata )
1134- .where (
1135- feast_metadata .c .metadata_key
1136- == FeastMetadataKeys .LAST_UPDATED_TIMESTAMP .value ,
1137- feast_metadata .c .project_id == project ,
1138- )
1139- .values (values )
1140- )
1141- conn .execute (update_stmt )
1142- else :
1143- insert_stmt = insert (feast_metadata ).values (
1144- values ,
1122+ stmt = select (feast_metadata ).where (
1123+ feast_metadata .c .metadata_key
1124+ == FeastMetadataKeys .LAST_UPDATED_TIMESTAMP .value ,
1125+ feast_metadata .c .project_id == project ,
1126+ )
1127+ row = conn .execute (stmt ).first ()
1128+
1129+ update_time = int (last_updated .timestamp ())
1130+
1131+ values = {
1132+ "metadata_key" : FeastMetadataKeys .LAST_UPDATED_TIMESTAMP .value ,
1133+ "metadata_value" : f"{ update_time } " ,
1134+ "last_updated_timestamp" : update_time ,
1135+ "project_id" : project ,
1136+ }
1137+ if row :
1138+ update_stmt = (
1139+ update (feast_metadata )
1140+ .where (
1141+ feast_metadata .c .metadata_key
1142+ == FeastMetadataKeys .LAST_UPDATED_TIMESTAMP .value ,
1143+ feast_metadata .c .project_id == project ,
11451144 )
1146- conn .execute (insert_stmt )
1145+ .values (values )
1146+ )
1147+ conn .execute (update_stmt )
1148+ else :
1149+ insert_stmt = insert (feast_metadata ).values (
1150+ values ,
1151+ )
1152+ conn .execute (insert_stmt )
11471153
11481154 def _get_last_updated_metadata (self , project : str ):
11491155 with self .read_engine .begin () as conn :
@@ -1270,36 +1276,41 @@ def delete_project(
12701276
12711277 raise ProjectNotFoundException (name )
12721278
1273- def set_project_metadata (self , project : str , key : str , value : str ):
1279+ def set_project_metadata (self , project : str , key : str , value : str , conn = None ):
12741280 """Set a custom project metadata key-value pair in the feast_metadata table."""
12751281 from feast .utils import _utc_now
12761282
12771283 update_time = int (_utc_now ().timestamp ())
1278- with self .write_engine .begin () as conn :
1279- stmt = select (feast_metadata ).where (
1280- feast_metadata .c .project_id == project ,
1281- feast_metadata .c .metadata_key == key ,
1282- )
1283- row = conn .execute (stmt ).first ()
1284- values = {
1285- "metadata_key" : key ,
1286- "metadata_value" : value ,
1287- "last_updated_timestamp" : update_time ,
1288- "project_id" : project ,
1289- }
1290- if row :
1291- update_stmt = (
1292- update (feast_metadata )
1293- .where (
1294- feast_metadata .c .project_id == project ,
1295- feast_metadata .c .metadata_key == key ,
1296- )
1297- .values (values )
1284+
1285+ if conn is None :
1286+ with self .write_engine .begin () as conn :
1287+ self .set_project_metadata (project , key , value , conn )
1288+ return
1289+
1290+ stmt = select (feast_metadata ).where (
1291+ feast_metadata .c .project_id == project ,
1292+ feast_metadata .c .metadata_key == key ,
1293+ )
1294+ row = conn .execute (stmt ).first ()
1295+ values = {
1296+ "metadata_key" : key ,
1297+ "metadata_value" : value ,
1298+ "last_updated_timestamp" : update_time ,
1299+ "project_id" : project ,
1300+ }
1301+ if row :
1302+ update_stmt = (
1303+ update (feast_metadata )
1304+ .where (
1305+ feast_metadata .c .project_id == project ,
1306+ feast_metadata .c .metadata_key == key ,
12981307 )
1299- conn .execute (update_stmt )
1300- else :
1301- insert_stmt = insert (feast_metadata ).values (values )
1302- conn .execute (insert_stmt )
1308+ .values (values )
1309+ )
1310+ conn .execute (update_stmt )
1311+ else :
1312+ insert_stmt = insert (feast_metadata ).values (values )
1313+ conn .execute (insert_stmt )
13031314
13041315 def get_project_metadata (self , project : str , key : str ) -> Optional [str ]:
13051316 """Get a custom project metadata value by key from the feast_metadata table."""
0 commit comments