feat: Use session temp tables for all ephemeral storage#1569
feat: Use session temp tables for all ephemeral storage#1569TrevorBergeron merged 10 commits intomainfrom
Conversation
| assert result_table.clustering_fields == cluster_cols | ||
|
|
||
| session_resource_manager.close() | ||
| with pytest.raises(google.api_core.exceptions.NotFound): |
There was a problem hiding this comment.
FWIW: if you sync to main I made a similar fix in https://github.com/googleapis/python-bigquery-dataframes/pull/1572/files. I think we can keep the pytest.raises and just do the loop.
There was a problem hiding this comment.
ok, just merged in and used your version
tswast
left a comment
There was a problem hiding this comment.
Thanks! Just a few nits.
bigframes/session/__init__.py
Outdated
| anon_dataset_manager = getattr(self, "_anon_dataset_manager", None) | ||
| if anon_dataset_manager: | ||
| self._anon_dataset_manager.close() | ||
|
|
||
| if getattr(self, "_session_resource_manager", None): | ||
| if self._session_resource_manager is not None: | ||
| self._session_resource_manager.close() |
There was a problem hiding this comment.
Nit: We can make this a little more internally consistent. If you'd like to use getattr in the if statement, let's use the := (walrus operator) so that we aren't fetching from self more than once.
| anon_dataset_manager = getattr(self, "_anon_dataset_manager", None) | |
| if anon_dataset_manager: | |
| self._anon_dataset_manager.close() | |
| if getattr(self, "_session_resource_manager", None): | |
| if self._session_resource_manager is not None: | |
| self._session_resource_manager.close() | |
| if anon_dataset_manager := getattr(self, "_anon_dataset_manager", None): | |
| anon_dataset_manager.close() | |
| if session_resource_manager := getattr(self, "_session_resource_manager", None): | |
| session_resource_manager.close() |
There was a problem hiding this comment.
Committed my own suggestion. 🤞 hopefully I didn't break anything.
| engine=engine, | ||
| write_engine=write_engine, | ||
| ) | ||
| table = self._temp_storage_manager.allocate_temp_table() |
There was a problem hiding this comment.
Does this mean the table isn't created right away? If so, I think we might need to supply a session ID in the load job, if available.
Edit: I see this was moved to read_bigquery_load_job, which makes sense to me. Aside: with more hybrid engine stuff in the future, I can imagine some cases where to_gbq() would be doing a load job to a user-managed table, but I suppose that would probably use a very different job config, anyway.
|
|
||
| job_config.labels = {"bigframes-api": api_name} | ||
| job_config.schema_update_options = [ | ||
| google.cloud.bigquery.job.SchemaUpdateOption.ALLOW_FIELD_ADDITION |
There was a problem hiding this comment.
I'm curious: why ALLOW_FIELD_ADDITION here, but using WRITE_TRUNCATE for read_gbq_load_job? Might be worthwhile to add some comments.
There was a problem hiding this comment.
I just want to make sure the ordering_col does not get overridden, as that is what is clustered.
Might still work with TRUNCATE as well?
| self._start_generic_job(load_job) | ||
| table_id = f"{table.project}.{table.dataset_id}.{table.table_id}" | ||
|
|
||
| # Update the table expiration so we aren't limited to the default 24 |
There was a problem hiding this comment.
I assume _storage_manager.create_temp_table handles this in the anonymous dataset case now?
Edit: Yes, I see we do set an expiration above:
expiration = (
datetime.datetime.now(datetime.timezone.utc) + constants.DEFAULT_EXPIRATION
)
table = bf_io_bigquery.create_temp_table(
self.bqclient,
self.allocate_temp_table(),
expiration,
schema=schema,
cluster_columns=list(cluster_cols),
kms_key=self._kms_key,
)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> 🦕