From 5cd413bd75dc1ba9f60389a53537c63769e3e3fa Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Sat, 5 Apr 2025 00:56:32 +0000 Subject: [PATCH 1/6] feat: Improve local data validation --- bigframes/core/array_value.py | 11 +- bigframes/core/blocks.py | 5 +- bigframes/core/local_data.py | 116 +++++++++++++----- bigframes/session/__init__.py | 23 +--- .../ibis/backends/sql/compilers/base.py | 6 +- 5 files changed, 98 insertions(+), 63 deletions(-) diff --git a/bigframes/core/array_value.py b/bigframes/core/array_value.py index c7eaafe3de..071365199c 100644 --- a/bigframes/core/array_value.py +++ b/bigframes/core/array_value.py @@ -58,18 +58,19 @@ class ArrayValue: @classmethod def from_pyarrow(cls, arrow_table: pa.Table, session: Session): - adapted_table = local_data.adapt_pa_table(arrow_table) - schema = local_data.arrow_schema_to_bigframes(adapted_table.schema) + data_source = local_data.ManagedArrowTable.from_pyarrow(arrow_table) + return cls.from_managed(source=data_source, session=session) + @classmethod + def from_managed(cls, source: local_data.ManagedArrowTable, session: Session): scan_list = nodes.ScanList( tuple( nodes.ScanItem(ids.ColumnId(item.column), item.dtype, item.column) - for item in schema.items + for item in source.schema.items ) ) - data_source = local_data.ManagedArrowTable(adapted_table, schema) node = nodes.ReadLocalNode( - data_source, + source, session=session, scan_list=scan_list, ) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index f9bb44ca61..d56147cd47 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -53,6 +53,7 @@ from bigframes import session from bigframes._config import sampling_options import bigframes.constants +from bigframes.core import local_data import bigframes.core as core import bigframes.core.compile.googlesql as googlesql import bigframes.core.expression as ex @@ -187,8 +188,8 @@ def from_local( pd_data = pd_data.set_axis(column_ids, axis=1) pd_data = pd_data.reset_index(names=index_ids) - as_pyarrow = pa.Table.from_pandas(pd_data, preserve_index=False) - array_value = core.ArrayValue.from_pyarrow(as_pyarrow, session=session) + managed_data = local_data.ManagedArrowTable.from_pandas(pd_data) + array_value = core.ArrayValue.from_managed(managed_data, session=session) block = cls( array_value, column_labels=column_labels, diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index 3a4b7ac0d9..d63706ca12 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -18,8 +18,12 @@ import dataclasses import functools +from typing import Union import uuid +import geopandas +import numpy as np +import pandas import pyarrow as pa import bigframes.core.schema as schemata @@ -32,51 +36,103 @@ class LocalTableMetadata: row_count: int @classmethod - def from_arrow(cls, table: pa.Table): + def from_arrow(cls, table: pa.Table) -> LocalTableMetadata: return cls(total_bytes=table.nbytes, row_count=table.num_rows) +_MANAGED_STORAGE_TYPES_OVERRIDES: dict[bigframes.dtypes.Dtype, pa.DataType] = { + # wkt to be precise + bigframes.dtypes.GEO_DTYPE: pa.string() +} + + @dataclasses.dataclass(frozen=True) class ManagedArrowTable: data: pa.Table = dataclasses.field(hash=False) schema: schemata.ArraySchema = dataclasses.field(hash=False) id: uuid.UUID = dataclasses.field(default_factory=uuid.uuid4) + def __post_init__(self): + self.validate() + @functools.cached_property - def metadata(self): + def metadata(self) -> LocalTableMetadata: return LocalTableMetadata.from_arrow(self.data) - -def arrow_schema_to_bigframes(arrow_schema: pa.Schema) -> schemata.ArraySchema: - """Infer the corresponding bigframes schema given a pyarrow schema.""" - schema_items = tuple( - schemata.SchemaItem( - field.name, - bigframes_type_for_arrow_type(field.type), + @classmethod + def from_pandas(cls, dataframe: pandas.DataFrame) -> ManagedArrowTable: + """Creates managed table from pandas. Ignores index, col names must be unique strings""" + columns: list[pa.ChunkedArray] = [] + fields: list[schemata.SchemaItem] = [] + column_names = list(dataframe.columns) + assert len(column_names) == len(set(column_names)) + + for name, col in dataframe.items(): + new_arr, bf_type = _adapt_pandas_series(col) + columns.append(new_arr) + fields.append(schemata.SchemaItem(str(name), bf_type)) + + return ManagedArrowTable( + pa.table(columns, names=column_names), schemata.ArraySchema(tuple(fields)) ) - for field in arrow_schema - ) - return schemata.ArraySchema(schema_items) - - -def adapt_pa_table(arrow_table: pa.Table) -> pa.Table: - """Adapt a pyarrow table to one that can be handled by bigframes. Converts tz to UTC and unit to us for temporal types.""" - new_schema = pa.schema( - [ - pa.field(field.name, arrow_type_replacements(field.type)) - for field in arrow_table.schema - ] - ) - return arrow_table.cast(new_schema) + @classmethod + def from_pyarrow(self, table: pa.Table) -> ManagedArrowTable: + columns: list[pa.ChunkedArray] = [] + fields: list[schemata.SchemaItem] = [] + for name, arr in zip(table.column_names, table.columns): + new_arr, bf_type = _adapt_arrow_array(arr) + columns.append(new_arr) + fields.append(schemata.SchemaItem(name, bf_type)) + + return ManagedArrowTable( + pa.table(columns, names=table.column_names), + schemata.ArraySchema(tuple(fields)), + ) -def bigframes_type_for_arrow_type(pa_type: pa.DataType) -> bigframes.dtypes.Dtype: - return bigframes.dtypes.arrow_dtype_to_bigframes_dtype( - arrow_type_replacements(pa_type) - ) + def validate(self): + # TODO: Content-based validation for some datatypes (eg json, wkt, list) where logical domain is smaller than pyarrow type + for bf_field, arrow_field in zip(self.schema.items, self.data.schema): + expected_arrow_type = _get_managed_storage_type(bf_field.dtype) + arrow_type = arrow_field.type + if expected_arrow_type != arrow_type: + raise TypeError( + f"Field {bf_field} has arrow array type: {arrow_type}, expected type: {expected_arrow_type}" + ) -def arrow_type_replacements(type: pa.DataType) -> pa.DataType: +def _get_managed_storage_type(dtype: bigframes.dtypes.Dtype) -> pa.DataType: + if dtype in _MANAGED_STORAGE_TYPES_OVERRIDES.keys(): + return _MANAGED_STORAGE_TYPES_OVERRIDES[dtype] + else: + return bigframes.dtypes.bigframes_dtype_to_arrow_dtype(dtype) + + +def _adapt_pandas_series( + series: pandas.Series, +) -> tuple[Union[pa.ChunkedArray, pa.Array], bigframes.dtypes.Dtype]: + if series.dtype == np.dtype("O"): + try: + series = series.astype(bigframes.dtypes.GEO_DTYPE) + except TypeError: + pass + if series.dtype == bigframes.dtypes.GEO_DTYPE: + series = geopandas.GeoSeries(series).to_wkt() + return pa.array(series, type=pa.string()), bigframes.dtypes.GEO_DTYPE + return _adapt_arrow_array(pa.array(series)) + + +def _adapt_arrow_array( + array: Union[pa.ChunkedArray, pa.Array] +) -> tuple[Union[pa.ChunkedArray, pa.Array], bigframes.dtypes.Dtype]: + target_type = _arrow_type_replacements(array.type) + if target_type != array.type: + # TODO: Maybe warn if lossy conversion? + array = array.cast(target_type) + return array, bigframes.dtypes.arrow_dtype_to_bigframes_dtype(target_type) + + +def _arrow_type_replacements(type: pa.DataType) -> pa.DataType: if pa.types.is_timestamp(type): # This is potentially lossy, but BigFrames doesn't support ns new_tz = "UTC" if (type.tz is not None) else None @@ -92,7 +148,7 @@ def arrow_type_replacements(type: pa.DataType) -> pa.DataType: if pa.types.is_decimal256(type): return pa.decimal256(76, 38) if pa.types.is_dictionary(type): - return arrow_type_replacements(type.value_type) + return _arrow_type_replacements(type.value_type) if pa.types.is_large_string(type): # simple string type can handle the largest strings needed return pa.string() @@ -100,7 +156,7 @@ def arrow_type_replacements(type: pa.DataType) -> pa.DataType: # null as a type not allowed, default type is float64 for bigframes return pa.float64() if pa.types.is_list(type): - new_field_t = arrow_type_replacements(type.value_type) + new_field_t = _arrow_type_replacements(type.value_type) if new_field_t != type.value_type: return pa.list_(new_field_t) return type diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 72129d594e..e7f0fed9f4 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -108,11 +108,6 @@ logger = logging.getLogger(__name__) -NON_INLINABLE_DTYPES: Sequence[bigframes.dtypes.Dtype] = ( - # Currently excluded as doesn't have arrow type - bigframes.dtypes.GEO_DTYPE, -) - class Session( third_party_pandas_gbq.GBQIOMixin, @@ -798,12 +793,7 @@ def _read_pandas( ) if write_engine == "default": - try: - inline_df = self._read_pandas_inline(pandas_dataframe) - return inline_df - except ValueError: - pass - return self._read_pandas_load_job(pandas_dataframe, api_name) + return self._read_pandas_inline(pandas_dataframe) elif write_engine == "bigquery_inline": return self._read_pandas_inline(pandas_dataframe) elif write_engine == "bigquery_load": @@ -838,17 +828,6 @@ def _read_pandas_inline( f"Could not convert with a BigQuery type: `{exc}`. " ) from exc - # Make sure all types are inlinable to avoid escaping errors. - inline_types = inline_df._block.expr.schema.dtypes - noninlinable_types = [ - dtype for dtype in inline_types if dtype in NON_INLINABLE_DTYPES - ] - if len(noninlinable_types) != 0: - raise ValueError( - f"Could not inline with a BigQuery type: `{noninlinable_types}`. " - f"{constants.FEEDBACK_LINK}" - ) - return inline_df def _read_pandas_load_job( diff --git a/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py b/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py index 305314edf0..6e98d6a9e1 100644 --- a/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py +++ b/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py @@ -813,10 +813,8 @@ def visit_DefaultLiteral(self, op, *, value, dtype): elif dtype.is_json(): return sge.ParseJSON(this=sge.convert(str(value))) elif dtype.is_geospatial(): - args = [value.wkt] - if (srid := dtype.srid) is not None: - args.append(srid) - return self.f.st_geomfromtext(*args) + wkt = value if isinstance(value, str) else value.wkt + return self.f.st_geogfromtext(wkt) raise NotImplementedError(f"Unsupported type: {dtype!r}") From 2b97cd7f9d6897568802c8db35b7e6eefcd16d72 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Sat, 5 Apr 2025 01:49:51 +0000 Subject: [PATCH 2/6] remove useless test --- tests/unit/session/test_io_pandas.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/tests/unit/session/test_io_pandas.py b/tests/unit/session/test_io_pandas.py index 24155437fe..2fa07aed35 100644 --- a/tests/unit/session/test_io_pandas.py +++ b/tests/unit/session/test_io_pandas.py @@ -24,7 +24,6 @@ import pandas.testing import pyarrow # type: ignore import pytest -import shapely # type: ignore import bigframes.core.schema import bigframes.features @@ -504,17 +503,3 @@ def test_read_pandas_with_bigframes_dataframe(): ValueError, match=re.escape("read_pandas() expects a pandas.DataFrame") ): session.read_pandas(df) - - -def test_read_pandas_inline_w_noninlineable_type_raises_error(): - session = resources.create_bigquery_session() - data = [ - shapely.Point(1, 1), - shapely.Point(2, 1), - shapely.Point(1, 2), - ] - s = pandas.Series(data, dtype=geopandas.array.GeometryDtype()) - with pytest.raises( - ValueError, match="Could not (convert|inline) with a BigQuery type:" - ): - session.read_pandas(s, write_engine="bigquery_inline") From 773e7dcb7177556d10143a99d564f9569fa376b1 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Sat, 5 Apr 2025 01:52:54 +0000 Subject: [PATCH 3/6] fix geopandas import mypy complaints --- bigframes/core/local_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index d63706ca12..1646f79a25 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -21,7 +21,7 @@ from typing import Union import uuid -import geopandas +import geopandas # type: ignore import numpy as np import pandas import pyarrow as pa From 9cabcb8074c12f457d104eda2241ce7b73de281c Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Sat, 5 Apr 2025 04:23:52 +0000 Subject: [PATCH 4/6] fixes --- bigframes/core/local_data.py | 17 +++++++++++++---- bigframes/session/__init__.py | 7 ++++++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index 1646f79a25..7a468d800e 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -18,7 +18,7 @@ import dataclasses import functools -from typing import Union +from typing import cast, Union import uuid import geopandas # type: ignore @@ -117,7 +117,7 @@ def _adapt_pandas_series( except TypeError: pass if series.dtype == bigframes.dtypes.GEO_DTYPE: - series = geopandas.GeoSeries(series).to_wkt() + series = geopandas.GeoSeries(series).to_wkt(rounding_precision=-1) return pa.array(series, type=pa.string()), bigframes.dtypes.GEO_DTYPE return _adapt_arrow_array(pa.array(series)) @@ -129,6 +129,9 @@ def _adapt_arrow_array( if target_type != array.type: # TODO: Maybe warn if lossy conversion? array = array.cast(target_type) + bf_type = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(target_type) + storage_type = _get_managed_storage_type(bf_type) + assert storage_type == array.type return array, bigframes.dtypes.arrow_dtype_to_bigframes_dtype(target_type) @@ -147,8 +150,6 @@ def _arrow_type_replacements(type: pa.DataType) -> pa.DataType: return pa.decimal128(38, 9) if pa.types.is_decimal256(type): return pa.decimal256(76, 38) - if pa.types.is_dictionary(type): - return _arrow_type_replacements(type.value_type) if pa.types.is_large_string(type): # simple string type can handle the largest strings needed return pa.string() @@ -160,5 +161,13 @@ def _arrow_type_replacements(type: pa.DataType) -> pa.DataType: if new_field_t != type.value_type: return pa.list_(new_field_t) return type + if pa.types.is_struct(type): + struct_type = cast(pa.StructType, type) + new_fields: list[pa.Field] = [] + for i in range(struct_type.num_fields): + field = struct_type.field(i) + field.with_type(_arrow_type_replacements(field.type)) + new_fields.append(field.with_type(_arrow_type_replacements(field.type))) + return pa.struct(new_fields) else: return type diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index e7f0fed9f4..4304e33f35 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -793,7 +793,12 @@ def _read_pandas( ) if write_engine == "default": - return self._read_pandas_inline(pandas_dataframe) + try: + inline_df = self._read_pandas_inline(pandas_dataframe) + return inline_df + except ValueError: + pass + return self._read_pandas_load_job(pandas_dataframe, api_name) elif write_engine == "bigquery_inline": return self._read_pandas_inline(pandas_dataframe) elif write_engine == "bigquery_load": From 15a6e99305743b8bc3a17d6423b8b93df8c5d578 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 7 Apr 2025 18:35:59 +0000 Subject: [PATCH 5/6] fix duration storage validation --- bigframes/core/local_data.py | 7 +++++-- bigframes/dtypes.py | 2 ++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index 7a468d800e..4ac0246f84 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -131,8 +131,11 @@ def _adapt_arrow_array( array = array.cast(target_type) bf_type = bigframes.dtypes.arrow_dtype_to_bigframes_dtype(target_type) storage_type = _get_managed_storage_type(bf_type) - assert storage_type == array.type - return array, bigframes.dtypes.arrow_dtype_to_bigframes_dtype(target_type) + if storage_type != array.type: + raise TypeError( + f"Expected {bf_type} to use arrow {storage_type}, instead got {array.type}" + ) + return array, bf_type def _arrow_type_replacements(type: pa.DataType) -> pa.DataType: diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 22cc521e8e..194da57ac1 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -456,6 +456,8 @@ def bigframes_dtype_to_arrow_dtype( if bigframes_dtype in _BIGFRAMES_TO_ARROW: return _BIGFRAMES_TO_ARROW[bigframes_dtype] if isinstance(bigframes_dtype, pd.ArrowDtype): + if pa.types.is_duration(bigframes_dtype.pyarrow_dtype): + return bigframes_dtype.pyarrow_dtype if pa.types.is_list(bigframes_dtype.pyarrow_dtype): return bigframes_dtype.pyarrow_dtype if pa.types.is_struct(bigframes_dtype.pyarrow_dtype): From 5a7e1f608252a5d68a93d7b80219d92a35380208 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 7 Apr 2025 21:14:36 +0000 Subject: [PATCH 6/6] clean up geo type coercion --- bigframes/core/local_data.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index 4ac0246f84..b21f12ff94 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -111,15 +111,19 @@ def _get_managed_storage_type(dtype: bigframes.dtypes.Dtype) -> pa.DataType: def _adapt_pandas_series( series: pandas.Series, ) -> tuple[Union[pa.ChunkedArray, pa.Array], bigframes.dtypes.Dtype]: - if series.dtype == np.dtype("O"): - try: - series = series.astype(bigframes.dtypes.GEO_DTYPE) - except TypeError: - pass + # Mostly rely on pyarrow conversions, but have to convert geo without its help. if series.dtype == bigframes.dtypes.GEO_DTYPE: series = geopandas.GeoSeries(series).to_wkt(rounding_precision=-1) return pa.array(series, type=pa.string()), bigframes.dtypes.GEO_DTYPE - return _adapt_arrow_array(pa.array(series)) + try: + return _adapt_arrow_array(pa.array(series)) + except Exception as e: + if series.dtype == np.dtype("O"): + try: + series = series.astype(bigframes.dtypes.GEO_DTYPE) + except TypeError: + pass + raise e def _adapt_arrow_array(