From 1404788b907ec5b6f9dd554f63c868af28507c7a Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Thu, 17 Apr 2025 17:32:41 +0000 Subject: [PATCH 1/4] feat: enable local json string validations --- bigframes/core/local_data.py | 21 ++++++++++++++++++--- tests/system/small/test_session.py | 26 ++++++++++++++++++++++++-- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index 70b1741af7..524a4c57a1 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -26,11 +26,12 @@ import geopandas # type: ignore import numpy as np -import pandas +import pandas as pd import pyarrow as pa import pyarrow.parquet # type: ignore import bigframes.core.schema as schemata +import bigframes.core.utils as utils import bigframes.dtypes @@ -66,7 +67,7 @@ def metadata(self) -> LocalTableMetadata: return LocalTableMetadata.from_arrow(self.data) @classmethod - def from_pandas(cls, dataframe: pandas.DataFrame) -> ManagedArrowTable: + def from_pandas(cls, dataframe: pd.DataFrame) -> ManagedArrowTable: """Creates managed table from pandas. Ignores index, col names must be unique strings""" columns: list[pa.ChunkedArray] = [] fields: list[schemata.SchemaItem] = [] @@ -77,6 +78,8 @@ def from_pandas(cls, dataframe: pandas.DataFrame) -> ManagedArrowTable: new_arr, bf_type = _adapt_pandas_series(col) columns.append(new_arr) fields.append(schemata.SchemaItem(str(name), bf_type)) + if bf_type == bigframes.dtypes.JSON_DTYPE: + _is_valid_json_series(col) return ManagedArrowTable( pa.table(columns, names=column_names), schemata.ArraySchema(tuple(fields)) @@ -226,7 +229,7 @@ def _( def _adapt_pandas_series( - series: pandas.Series, + series: pd.Series, ) -> tuple[Union[pa.ChunkedArray, pa.Array], bigframes.dtypes.Dtype]: # Mostly rely on pyarrow conversions, but have to convert geo without its help. if series.dtype == bigframes.dtypes.GEO_DTYPE: @@ -329,3 +332,15 @@ def _physical_type_replacements(dtype: pa.DataType) -> pa.DataType: if dtype in _ARROW_MANAGED_STORAGE_OVERRIDES: return _ARROW_MANAGED_STORAGE_OVERRIDES[dtype] return dtype + + +def _is_valid_json_series(s: pd.Series): + """Validate elements of a Series by attempting JSON parsing.""" + for data in s: + # Skip scalar null values to avoid `TypeError` from json.load. + if not utils.is_list_like(data) and pd.isna(data): + continue + try: + json.loads(data) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON format found: {data!r}") from e diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 24edc91c93..4a59a87a9a 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -962,8 +962,8 @@ def test_read_pandas_json_series(session, write_engine): json_data = [ "1", None, - '["1","3","5"]', - '{"a":1,"b":["x","y"],"c":{"x":[],"z":false}}', + '[1,"3",null,{"a":null}]', + '{"a":1,"b":["x","y"],"c":{"x":[],"y":null,"z":false}}', ] expected_series = pd.Series(json_data, dtype=bigframes.dtypes.JSON_DTYPE) @@ -975,6 +975,28 @@ def test_read_pandas_json_series(session, write_engine): ) +@pytest.mark.parametrize( + ("write_engine"), + [ + pytest.param("default"), + pytest.param("bigquery_inline"), + pytest.param("bigquery_load"), + pytest.param("bigquery_streaming"), + ], +) +def test_read_pandas_json_series_w_invalid_json(session, write_engine): + json_data = [ + "False", # Should be "false" + ] + pd_s = pd.Series(json_data, dtype=bigframes.dtypes.JSON_DTYPE) + + with pytest.raises( + ValueError, + match="Invalid JSON format found", + ): + session.read_pandas(pd_s, write_engine=write_engine) + + @pytest.mark.parametrize( ("write_engine"), [ From 46dffe0b1e5129846f3da9c29010320da2a5690d Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Thu, 17 Apr 2025 21:21:37 +0000 Subject: [PATCH 2/4] validates nested json context --- bigframes/core/local_data.py | 48 ++++++++++++++++++++++++------ tests/system/small/test_session.py | 27 +++++++++++++++++ 2 files changed, 66 insertions(+), 9 deletions(-) diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index 524a4c57a1..fb54fbbed5 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -59,9 +59,6 @@ class ManagedArrowTable: schema: schemata.ArraySchema = dataclasses.field(hash=False) id: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4) - def __post_init__(self): - self.validate() - @functools.cached_property def metadata(self) -> LocalTableMetadata: return LocalTableMetadata.from_arrow(self.data) @@ -78,12 +75,12 @@ def from_pandas(cls, dataframe: pd.DataFrame) -> ManagedArrowTable: new_arr, bf_type = _adapt_pandas_series(col) columns.append(new_arr) fields.append(schemata.SchemaItem(str(name), bf_type)) - if bf_type == bigframes.dtypes.JSON_DTYPE: - _is_valid_json_series(col) - return ManagedArrowTable( + mat = ManagedArrowTable( pa.table(columns, names=column_names), schemata.ArraySchema(tuple(fields)) ) + mat.validate(include_context=True) + return mat @classmethod def from_pyarrow(self, table: pa.Table) -> ManagedArrowTable: @@ -94,10 +91,12 @@ def from_pyarrow(self, table: pa.Table) -> ManagedArrowTable: columns.append(new_arr) fields.append(schemata.SchemaItem(name, bf_type)) - return ManagedArrowTable( + mat = ManagedArrowTable( pa.table(columns, names=table.column_names), schemata.ArraySchema(tuple(fields)), ) + mat.validate() + return mat def to_parquet( self, @@ -143,8 +142,7 @@ def itertuples( ): yield tuple(row_dict.values()) - def validate(self): - # TODO: Content-based validation for some datatypes (eg json, wkt, list) where logical domain is smaller than pyarrow type + def validate(self, include_context: bool = False): for bf_field, arrow_field in zip(self.schema.items, self.data.schema): expected_arrow_type = _get_managed_storage_type(bf_field.dtype) arrow_type = arrow_field.type @@ -153,6 +151,38 @@ def validate(self): f"Field {bf_field} has arrow array type: {arrow_type}, expected type: {expected_arrow_type}" ) + if include_context: + for batch in self.data.to_batches(): + for field in self.schema.items: + _validate_context(batch.column(field.column), field.dtype) + + +def _validate_context(array: pa.Array, dtype: bigframes.dtypes.Dtype): + """ + Recursively validates the content of a PyArrow Array based on the + expected BigFrames dtype, focusing on complex types like JSON, structs, + and arrays where the Arrow type alone isn't sufficient. + """ + # TODO: validate GEO data context. + if dtype == bigframes.dtypes.JSON_DTYPE: + values = array.to_pandas() + for data in values: + # Skip scalar null values to avoid `TypeError` from json.load. + if not utils.is_list_like(data) and pd.isna(data): + continue + try: + # Attempts JSON parsing. + json.loads(data) + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON format found: {data!r}") from e + elif bigframes.dtypes.is_struct_like(dtype): + for field_name, dtype in bigframes.dtypes.get_struct_fields(dtype).items(): + _validate_context(array.field(field_name), dtype) + elif bigframes.dtypes.is_array_like(dtype): + return _validate_context( + array.flatten(), bigframes.dtypes.get_array_inner_type(dtype) + ) + # Sequential iterator, but could split into batches and leverage parallelism for speed def _iter_table( diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 4a59a87a9a..4e0f8376a9 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -1078,6 +1078,33 @@ def test_read_pandas_w_nested_json(session, write_engine): pd.testing.assert_series_equal(bq_s, pd_s) +@pytest.mark.parametrize( + ("write_engine"), + [ + pytest.param("default"), + pytest.param("bigquery_inline"), + pytest.param("bigquery_load"), + pytest.param("bigquery_streaming"), + ], +) +def test_read_pandas_w_nested_invalid_json(session, write_engine): + # TODO: supply a reason why this isn't compatible with pandas 1.x + pytest.importorskip("pandas", minversion="2.0.0") + data = [ + [{"json_field": "NULL"}], # Should be "null" + ] + pa_array = pa.array(data, type=pa.list_(pa.struct([("json_field", pa.string())]))) + pd_s = pd.Series( + arrays.ArrowExtensionArray(pa_array), # type: ignore + dtype=pd.ArrowDtype( + pa.list_(pa.struct([("json_field", bigframes.dtypes.JSON_ARROW_TYPE)])) + ), + ) + + with pytest.raises(ValueError, match="Invalid JSON format found"): + session.read_pandas(pd_s, write_engine=write_engine) + + @pytest.mark.parametrize( ("write_engine"), [ From 2ede436db5d8fd417663e4622b8c6c2ebfd3eba4 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Thu, 17 Apr 2025 22:54:05 +0000 Subject: [PATCH 3/4] address comments --- bigframes/core/local_data.py | 30 +++++++----------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index fb54fbbed5..8bae6839bd 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -31,7 +31,6 @@ import pyarrow.parquet # type: ignore import bigframes.core.schema as schemata -import bigframes.core.utils as utils import bigframes.dtypes @@ -79,7 +78,7 @@ def from_pandas(cls, dataframe: pd.DataFrame) -> ManagedArrowTable: mat = ManagedArrowTable( pa.table(columns, names=column_names), schemata.ArraySchema(tuple(fields)) ) - mat.validate(include_context=True) + mat.validate(include_content=True) return mat @classmethod @@ -142,7 +141,7 @@ def itertuples( ): yield tuple(row_dict.values()) - def validate(self, include_context: bool = False): + def validate(self, include_content: bool = False): for bf_field, arrow_field in zip(self.schema.items, self.data.schema): expected_arrow_type = _get_managed_storage_type(bf_field.dtype) arrow_type = arrow_field.type @@ -151,13 +150,13 @@ def validate(self, include_context: bool = False): f"Field {bf_field} has arrow array type: {arrow_type}, expected type: {expected_arrow_type}" ) - if include_context: + if include_content: for batch in self.data.to_batches(): for field in self.schema.items: - _validate_context(batch.column(field.column), field.dtype) + _validate_content(batch.column(field.column), field.dtype) -def _validate_context(array: pa.Array, dtype: bigframes.dtypes.Dtype): +def _validate_content(array: pa.Array, dtype: bigframes.dtypes.Dtype): """ Recursively validates the content of a PyArrow Array based on the expected BigFrames dtype, focusing on complex types like JSON, structs, @@ -167,9 +166,6 @@ def _validate_context(array: pa.Array, dtype: bigframes.dtypes.Dtype): if dtype == bigframes.dtypes.JSON_DTYPE: values = array.to_pandas() for data in values: - # Skip scalar null values to avoid `TypeError` from json.load. - if not utils.is_list_like(data) and pd.isna(data): - continue try: # Attempts JSON parsing. json.loads(data) @@ -177,9 +173,9 @@ def _validate_context(array: pa.Array, dtype: bigframes.dtypes.Dtype): raise ValueError(f"Invalid JSON format found: {data!r}") from e elif bigframes.dtypes.is_struct_like(dtype): for field_name, dtype in bigframes.dtypes.get_struct_fields(dtype).items(): - _validate_context(array.field(field_name), dtype) + _validate_content(array.field(field_name), dtype) elif bigframes.dtypes.is_array_like(dtype): - return _validate_context( + return _validate_content( array.flatten(), bigframes.dtypes.get_array_inner_type(dtype) ) @@ -362,15 +358,3 @@ def _physical_type_replacements(dtype: pa.DataType) -> pa.DataType: if dtype in _ARROW_MANAGED_STORAGE_OVERRIDES: return _ARROW_MANAGED_STORAGE_OVERRIDES[dtype] return dtype - - -def _is_valid_json_series(s: pd.Series): - """Validate elements of a Series by attempting JSON parsing.""" - for data in s: - # Skip scalar null values to avoid `TypeError` from json.load. - if not utils.is_list_like(data) and pd.isna(data): - continue - try: - json.loads(data) - except json.JSONDecodeError as e: - raise ValueError(f"Invalid JSON format found: {data!r}") from e From 772e490b3720396f8f6981e0d135512c23ab6c52 Mon Sep 17 00:00:00 2001 From: Chelsea Lin Date: Fri, 18 Apr 2025 16:36:34 +0000 Subject: [PATCH 4/4] add null check back --- bigframes/core/local_data.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index 8bae6839bd..e5c67fcf43 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -31,6 +31,7 @@ import pyarrow.parquet # type: ignore import bigframes.core.schema as schemata +import bigframes.core.utils as utils import bigframes.dtypes @@ -166,6 +167,9 @@ def _validate_content(array: pa.Array, dtype: bigframes.dtypes.Dtype): if dtype == bigframes.dtypes.JSON_DTYPE: values = array.to_pandas() for data in values: + # Skip scalar null values to avoid `TypeError` from json.load. + if not utils.is_list_like(data) and pd.isna(data): + continue try: # Attempts JSON parsing. json.loads(data)