feat: Support write api as loading option#1617
Conversation
5e5c5b6 to
0d3ee4f
Compare
0d3ee4f to
e564382
Compare
bigframes/core/local_data.py
Outdated
| for field in self.data.schema | ||
| ) | ||
|
|
||
| # Can't use RecordBatch.cast until set higher min pyarrow version |
There was a problem hiding this comment.
Could we include which version of pyarrow that is? Also, this seems like a TODO to switch to RecordBatch.cast when we can.
# TODO: Use RecordBatch.cast once our minimum pyarrow version is at least X.Y.Z+.
There was a problem hiding this comment.
Yeah, updated to TODO with the needed version, 16.0, which is higher than our current floor, which is 15.0.2
bigframes/core/local_data.py
Outdated
| if duration_type == "int": | ||
|
|
||
| @_recursive_map_types | ||
| def durations_to_ints(type: pa.DataType) -> pa.DataType: | ||
| if pa.types.is_duration(type): | ||
| return pa.int64() | ||
| return type | ||
|
|
||
| schema = pa.schema( | ||
| pa.field(field.name, durations_to_ints(field.type)) | ||
| for field in self.data.schema | ||
| ) | ||
|
|
||
| # Can't use RecordBatch.cast until set higher min pyarrow version | ||
| def convert_batch(batch: pa.RecordBatch) -> pa.RecordBatch: | ||
| return pa.record_batch( | ||
| [arr.cast(type) for arr, type in zip(batch.columns, schema.types)], | ||
| schema=schema, | ||
| ) | ||
|
|
||
| batches = map(convert_batch, batches) |
There was a problem hiding this comment.
This type conversion logic seems like it could be pulled out into a separate function for easier unit testing.
Optional: parameterize it to support conversions from any type into any type. This might be useful for JSON in future, for example. Then again https://wiki.c2.com/?YouArentGonnaNeedIt, so let's just start with a refactor of the duration logic and we can parameterize it when we need such functionality.
There was a problem hiding this comment.
yeah, I don't want to generalize this too hard, as the general case might involve more complex things than a simple cast, and I don't want to build the scaffolding for it without reason. Next iteration will pull out duration logic though.
| raise ValueError( | ||
| f"Problem loading at least one row from DataFrame: {response.row_errors}. {constants.FEEDBACK_LINK}" | ||
| ) | ||
| destination_table = self._bqclient.get_table(bq_table_ref) |
There was a problem hiding this comment.
Do we need to do something here to finalize the stream? https://cloud.google.com/bigquery/docs/write-api-streaming
There was a problem hiding this comment.
Not strictly necessary, but can avoid limits, per docs: "This step is optional in committed type, but helps to prevent exceeding the limit on active streams"
There was a problem hiding this comment.
added finalize in new iteration
|
e2e failure: Looks like a flake, unrelated to this change. |
tswast
left a comment
There was a problem hiding this comment.
Love it. Thanks! Just one last thing
| return self._loader.read_pandas( | ||
| pandas_dataframe, method="stream", api_name=api_name | ||
| ) | ||
| elif write_engine == "bigquery_write": |
There was a problem hiding this comment.
Could you also add this to the various docstrings? Let's mark the "bigquery_write" option as [Preview] in the docs, too.
There was a problem hiding this comment.
added to the only docstring that enumerates the engines
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> 🦕