Skip to content

Commit 0052754

Browse files
authored
fix: Fixed transaction handling with SQLite registry (#5588)
1 parent 2f36e7b commit 0052754

File tree

3 files changed

+130
-61
lines changed

3 files changed

+130
-61
lines changed

infra/templates/README.md.jinja2

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<p align="center">
22
<a href="https://feast.dev/">
3-
<img src="docs/assets/feast_logo.png" width="550">
3+
<img src="https://raw.githubusercontent.com/feast-dev/feast/master/docs/assets/feast_logo.png" width="550">
44
</a>
55
</p>
66
<br />
@@ -36,7 +36,7 @@ Feast allows ML platform teams to:
3636
Please see our [documentation](https://docs.feast.dev/) for more information about the project.
3737

3838
## 📐 Architecture
39-
![](docs/assets/feast_marchitecture.png)
39+
![](https://raw.githubusercontent.com/feast-dev/feast/master/docs/assets/feast_marchitecture.png)
4040

4141
The above architecture is the minimal Feast deployment. Want to run the full Feast on Snowflake/GCP/AWS? Click [here](https://docs.feast.dev/how-to-guides/feast-snowflake-gcp-aws).
4242

@@ -60,7 +60,7 @@ feast apply
6060

6161
### 4. Explore your data in the web UI (experimental)
6262

63-
![Web UI](ui/sample.png)
63+
![Web UI](https://raw.githubusercontent.com/feast-dev/feast/master/ui/sample.png)
6464
```commandline
6565
feast ui
6666
```

sdk/python/feast/infra/registry/sql.py

Lines changed: 69 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,7 +1018,7 @@ def _apply_object(
10181018
self.get_project(name=project, allow_cache=False), commit=True
10191019
)
10201020
if not self.purge_feast_metadata:
1021-
self._set_last_updated_metadata(update_datetime, project)
1021+
self._set_last_updated_metadata(update_datetime, project, conn)
10221022

10231023
def _maybe_init_project_metadata(self, project):
10241024
# Initialize project metadata if needed
@@ -1060,7 +1060,7 @@ def _delete_object(
10601060
self.get_project(name=project, allow_cache=False), commit=True
10611061
)
10621062
if not self.purge_feast_metadata:
1063-
self._set_last_updated_metadata(_utc_now(), project)
1063+
self._set_last_updated_metadata(_utc_now(), project, conn)
10641064

10651065
return rows.rowcount
10661066

@@ -1111,39 +1111,45 @@ def _list_objects(
11111111
return objects
11121112
return []
11131113

1114-
def _set_last_updated_metadata(self, last_updated: datetime, project: str):
1115-
with self.write_engine.begin() as conn:
1116-
stmt = select(feast_metadata).where(
1117-
feast_metadata.c.metadata_key
1118-
== FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value,
1119-
feast_metadata.c.project_id == project,
1120-
)
1121-
row = conn.execute(stmt).first()
1122-
1123-
update_time = int(last_updated.timestamp())
1114+
def _set_last_updated_metadata(
1115+
self, last_updated: datetime, project: str, conn=None
1116+
):
1117+
if conn is None:
1118+
with self.write_engine.begin() as conn:
1119+
self._set_last_updated_metadata(last_updated, project, conn)
1120+
return
11241121

1125-
values = {
1126-
"metadata_key": FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value,
1127-
"metadata_value": f"{update_time}",
1128-
"last_updated_timestamp": update_time,
1129-
"project_id": project,
1130-
}
1131-
if row:
1132-
update_stmt = (
1133-
update(feast_metadata)
1134-
.where(
1135-
feast_metadata.c.metadata_key
1136-
== FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value,
1137-
feast_metadata.c.project_id == project,
1138-
)
1139-
.values(values)
1140-
)
1141-
conn.execute(update_stmt)
1142-
else:
1143-
insert_stmt = insert(feast_metadata).values(
1144-
values,
1122+
stmt = select(feast_metadata).where(
1123+
feast_metadata.c.metadata_key
1124+
== FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value,
1125+
feast_metadata.c.project_id == project,
1126+
)
1127+
row = conn.execute(stmt).first()
1128+
1129+
update_time = int(last_updated.timestamp())
1130+
1131+
values = {
1132+
"metadata_key": FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value,
1133+
"metadata_value": f"{update_time}",
1134+
"last_updated_timestamp": update_time,
1135+
"project_id": project,
1136+
}
1137+
if row:
1138+
update_stmt = (
1139+
update(feast_metadata)
1140+
.where(
1141+
feast_metadata.c.metadata_key
1142+
== FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value,
1143+
feast_metadata.c.project_id == project,
11451144
)
1146-
conn.execute(insert_stmt)
1145+
.values(values)
1146+
)
1147+
conn.execute(update_stmt)
1148+
else:
1149+
insert_stmt = insert(feast_metadata).values(
1150+
values,
1151+
)
1152+
conn.execute(insert_stmt)
11471153

11481154
def _get_last_updated_metadata(self, project: str):
11491155
with self.read_engine.begin() as conn:
@@ -1270,36 +1276,41 @@ def delete_project(
12701276

12711277
raise ProjectNotFoundException(name)
12721278

1273-
def set_project_metadata(self, project: str, key: str, value: str):
1279+
def set_project_metadata(self, project: str, key: str, value: str, conn=None):
12741280
"""Set a custom project metadata key-value pair in the feast_metadata table."""
12751281
from feast.utils import _utc_now
12761282

12771283
update_time = int(_utc_now().timestamp())
1278-
with self.write_engine.begin() as conn:
1279-
stmt = select(feast_metadata).where(
1280-
feast_metadata.c.project_id == project,
1281-
feast_metadata.c.metadata_key == key,
1282-
)
1283-
row = conn.execute(stmt).first()
1284-
values = {
1285-
"metadata_key": key,
1286-
"metadata_value": value,
1287-
"last_updated_timestamp": update_time,
1288-
"project_id": project,
1289-
}
1290-
if row:
1291-
update_stmt = (
1292-
update(feast_metadata)
1293-
.where(
1294-
feast_metadata.c.project_id == project,
1295-
feast_metadata.c.metadata_key == key,
1296-
)
1297-
.values(values)
1284+
1285+
if conn is None:
1286+
with self.write_engine.begin() as conn:
1287+
self.set_project_metadata(project, key, value, conn)
1288+
return
1289+
1290+
stmt = select(feast_metadata).where(
1291+
feast_metadata.c.project_id == project,
1292+
feast_metadata.c.metadata_key == key,
1293+
)
1294+
row = conn.execute(stmt).first()
1295+
values = {
1296+
"metadata_key": key,
1297+
"metadata_value": value,
1298+
"last_updated_timestamp": update_time,
1299+
"project_id": project,
1300+
}
1301+
if row:
1302+
update_stmt = (
1303+
update(feast_metadata)
1304+
.where(
1305+
feast_metadata.c.project_id == project,
1306+
feast_metadata.c.metadata_key == key,
12981307
)
1299-
conn.execute(update_stmt)
1300-
else:
1301-
insert_stmt = insert(feast_metadata).values(values)
1302-
conn.execute(insert_stmt)
1308+
.values(values)
1309+
)
1310+
conn.execute(update_stmt)
1311+
else:
1312+
insert_stmt = insert(feast_metadata).values(values)
1313+
conn.execute(insert_stmt)
13031314

13041315
def get_project_metadata(self, project: str, key: str) -> Optional[str]:
13051316
"""Get a custom project metadata value by key from the feast_metadata table."""
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Copyright 2021 The Feast Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import tempfile
16+
17+
import pytest
18+
19+
from feast.entity import Entity
20+
from feast.infra.registry.sql import SqlRegistry, SqlRegistryConfig
21+
22+
23+
@pytest.fixture
24+
def sqlite_registry():
25+
"""Create a temporary SQLite registry for testing."""
26+
fd, registry_path = tempfile.mkstemp()
27+
registry_config = SqlRegistryConfig(
28+
registry_type="sql",
29+
path=f"sqlite:///{registry_path}",
30+
purge_feast_metadata=False,
31+
)
32+
33+
registry = SqlRegistry(registry_config, "test_project", None)
34+
yield registry
35+
registry.teardown()
36+
37+
38+
def test_sql_registry(sqlite_registry):
39+
"""
40+
Test the SQL registry
41+
"""
42+
entity = Entity(
43+
name="test_entity",
44+
description="Test entity for testing",
45+
tags={"test": "transaction"},
46+
)
47+
sqlite_registry.apply_entity(entity, "test_project")
48+
retrieved_entity = sqlite_registry.get_entity("test_entity", "test_project")
49+
assert retrieved_entity.name == "test_entity"
50+
assert retrieved_entity.description == "Test entity for testing"
51+
52+
sqlite_registry.set_project_metadata("test_project", "test_key", "test_value")
53+
value = sqlite_registry.get_project_metadata("test_project", "test_key")
54+
assert value == "test_value"
55+
56+
sqlite_registry.delete_entity("test_entity", "test_project")
57+
with pytest.raises(Exception):
58+
sqlite_registry.get_entity("test_entity", "test_project")

0 commit comments

Comments
 (0)