diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml deleted file mode 100644 index 4a311db02..000000000 --- a/.github/.OwlBot.lock.yaml +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -docker: - image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest - digest: sha256:543e209e7c1c1ffe720eb4db1a3f045a75099304fb19aa11a47dc717b8aae2a9 -# created: 2025-10-09T14:48:42.914384887Z diff --git a/.github/.OwlBot.yaml b/.github/.OwlBot.yaml deleted file mode 100644 index 67768efb5..000000000 --- a/.github/.OwlBot.yaml +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -docker: - image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest - -deep-remove-regex: - - /owl-bot-staging - -# In source, we've used two capturing groups (v2) and (.*) to match the version -# and the directory path. -# In dest, we use $1 to refer to the first capturing group (v2) and $2 to refer -# to the second capturing group (directory path). -deep-copy-regex: - - source: /google/storage/(v2)/storage-v2-py/(.*) - dest: /owl-bot-staging/$1/$2 - -begin-after-commit-hash: 6acf4a0a797f1082027985c55c4b14b60f673dd7 diff --git a/.librarian/generator-input/.repo-metadata.json b/.librarian/generator-input/.repo-metadata.json new file mode 100644 index 000000000..f644429bc --- /dev/null +++ b/.librarian/generator-input/.repo-metadata.json @@ -0,0 +1,18 @@ +{ + "name": "storage", + "name_pretty": "Google Cloud Storage", + "product_documentation": "https://cloud.google.com/storage", + "client_documentation": "https://cloud.google.com/python/docs/reference/storage/latest", + "issue_tracker": "https://issuetracker.google.com/savedsearches/559782", + "release_level": "stable", + "language": "python", + "library_type": "GAPIC_MANUAL", + "repo": "googleapis/python-storage", + "distribution_name": "google-cloud-storage", + "api_id": "storage.googleapis.com", + "requires_billing": true, + "default_version": "v2", + "codeowner_team": "@googleapis/gcs-sdk-team", + "api_shortname": "storage", + "api_description": "is a durable and highly available object storage service. Google Cloud Storage is almost infinitely scalable and guarantees consistency: when a write succeeds, the latest copy of the object will be returned to any GET, globally." +} diff --git a/owlbot.py b/.librarian/generator-input/librarian.py similarity index 65% rename from owlbot.py rename to .librarian/generator-input/librarian.py index 67b2369ce..ad0baa2a3 100644 --- a/owlbot.py +++ b/.librarian/generator-input/librarian.py @@ -98,41 +98,11 @@ "noxfile.py", "CONTRIBUTING.rst", "README.rst", - ".kokoro/continuous/continuous.cfg", - ".kokoro/presubmit/system-3.8.cfg", - ".kokoro/presubmit/prerelease-deps.cfg", - ".kokoro/continuous/prerelease-deps.cfg", - ".github/blunderbuss.yml", # blunderbuss assignment to python squad - ".github/workflows", # exclude gh actions as credentials are needed for tests - ".github/release-please.yml", # special support for a python2 branch in this repo + ".kokoro/**", + ".github/**", ], ) -s.replace( - ".kokoro/build.sh", - "export PYTHONUNBUFFERED=1", - """export PYTHONUNBUFFERED=1 - -# Export variable to override api endpoint -export API_ENDPOINT_OVERRIDE - -# Export variable to override api endpoint version -export API_VERSION_OVERRIDE - -# Export dual region locations -export DUAL_REGION_LOC_1 -export DUAL_REGION_LOC_2 - -# Setup universe domain testing needed environment variables. -export TEST_UNIVERSE_DOMAIN_CREDENTIAL=$(realpath ${KOKORO_GFILE_DIR}/secret_manager/client-library-test-universe-domain-credential) -export TEST_UNIVERSE_DOMAIN=$(gcloud secrets versions access latest --project cloud-devrel-kokoro-resources --secret=client-library-test-universe-domain) -export TEST_UNIVERSE_PROJECT_ID=$(gcloud secrets versions access latest --project cloud-devrel-kokoro-resources --secret=client-library-test-universe-project-id) -export TEST_UNIVERSE_LOCATION=$(gcloud secrets versions access latest --project cloud-devrel-kokoro-resources --secret=client-library-test-universe-storage-location) - -""") - python.py_samples(skip_readmes=True) -# Use a python runtime which is available in the owlbot post processor here -# https://github.com/googleapis/synthtool/blob/master/docker/owlbot/python/Dockerfile -s.shell.run(["nox", "-s", "blacken-3.10"], hide_output=False) +s.shell.run(["nox", "-s", "blacken"], hide_output=False) diff --git a/.librarian/generator-input/noxfile.py b/.librarian/generator-input/noxfile.py new file mode 100644 index 000000000..16cf97b01 --- /dev/null +++ b/.librarian/generator-input/noxfile.py @@ -0,0 +1,408 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +import os +import pathlib +import re +import shutil + +import nox + + +BLACK_VERSION = "black==23.7.0" +BLACK_PATHS = ["docs", "google", "tests", "noxfile.py", "setup.py"] + +DEFAULT_PYTHON_VERSION = "3.12" +SYSTEM_TEST_PYTHON_VERSIONS = ["3.12"] +UNIT_TEST_PYTHON_VERSIONS = ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] +CONFORMANCE_TEST_PYTHON_VERSIONS = ["3.12"] + +CURRENT_DIRECTORY = pathlib.Path(__file__).parent.absolute() + +# Error if a python version is missing +nox.options.error_on_missing_interpreters = True + +nox.options.sessions = [ + "blacken", + "conftest_retry", + "docfx", + "docs", + "lint", + "lint_setup_py", + "system", + # TODO(https://github.com/googleapis/python-storage/issues/1499): + # Remove or restore testing for Python 3.7/3.8 + "unit-3.9", + "unit-3.10", + "unit-3.11", + "unit-3.12", + "unit-3.13", + # cover must be last to avoid error `No data to report` + "cover", +] + + +@nox.session(python=DEFAULT_PYTHON_VERSION) +def lint(session): + """Run linters. + + Returns a failure if the linters find linting errors or sufficiently + serious code quality issues. + """ + # Pin flake8 to 6.0.0 + # See https://github.com/googleapis/python-storage/issues/1102 + session.install("flake8==6.0.0", BLACK_VERSION) + session.run( + "black", + "--check", + *BLACK_PATHS, + ) + session.run("flake8", "google", "tests") + + +@nox.session(python="3.14") +def blacken(session): + """Run black. + + Format code to uniform standard. + """ + session.install(BLACK_VERSION) + session.run( + "black", + *BLACK_PATHS, + ) + + +@nox.session(python=DEFAULT_PYTHON_VERSION) +def lint_setup_py(session): + """Verify that setup.py is valid (including RST check).""" + session.install("docutils", "pygments", "setuptools>=79.0.1") + session.run("python", "setup.py", "check", "--restructuredtext", "--strict") + + +def default(session, install_extras=True): + constraints_path = str( + CURRENT_DIRECTORY / "testing" / f"constraints-{session.python}.txt" + ) + # Install all test dependencies, then install this package in-place. + session.install( + "mock", + "pytest", + "pytest-cov", + "pytest-asyncio", + "brotli", + "grpcio", + "grpcio-status", + "proto-plus", + "grpc-google-iam-v1", + "-c", + constraints_path, + ) + + if install_extras: + session.install("opentelemetry-api", "opentelemetry-sdk") + + session.install("-e", ".", "-c", constraints_path) + + # This dependency is included in setup.py for backwards compatibility only + # and the client library is expected to pass all tests without it. See + # setup.py and README for details. + session.run("pip", "uninstall", "-y", "google-resumable-media") + + # Run py.test against the unit tests. + session.run( + "py.test", + "--quiet", + f"--junitxml=unit_{session.python}_sponge_log.xml", + "--cov=google.cloud.storage", + "--cov=google.cloud", + "--cov=tests.unit", + "--cov-append", + "--cov-config=.coveragerc", + "--cov-report=", + "--cov-fail-under=0", + os.path.join("tests", "unit"), + os.path.join("tests", "resumable_media", "unit"), + *session.posargs, + ) + + +@nox.session(python=UNIT_TEST_PYTHON_VERSIONS) +def unit(session): + """Run the unit test suite.""" + default(session) + + +@nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS) +def system(session): + constraints_path = str( + CURRENT_DIRECTORY / "testing" / f"constraints-{session.python}.txt" + ) + """Run the system test suite.""" + rerun_count = 0 + + # Check the value of `RUN_SYSTEM_TESTS` env var. It defaults to true. + if os.environ.get("RUN_SYSTEM_TESTS", "true") == "false": + session.skip("RUN_SYSTEM_TESTS is set to false, skipping") + # Environment check: Only run tests if the environment variable is set. + if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", ""): + session.skip( + "Credentials must be set via environment variable GOOGLE_APPLICATION_CREDENTIALS" + ) + # mTLS tests requires pyopenssl. + if os.environ.get("GOOGLE_API_USE_CLIENT_CERTIFICATE", "") == "true": + session.install("pyopenssl") + # Check if endpoint is being overriden for rerun_count + if ( + os.getenv("API_ENDPOINT_OVERRIDE", "https://storage.googleapis.com") + != "https://storage.googleapis.com" + ): + rerun_count = 3 + + # Use pre-release gRPC for system tests. + # TODO: Remove ban of 1.52.0rc1 once grpc/grpc#31885 is resolved. + session.install("--pre", "grpcio!=1.52.0rc1") + + # Install all test dependencies, then install this package into the + # virtualenv's dist-packages. + # 2021-05-06: defer installing 'google-cloud-*' to after this package, + # in order to work around Python 2.7 googolapis-common-protos + # issue. + session.install("mock", "pytest", "pytest-rerunfailures", "-c", constraints_path) + session.install("-e", ".", "-c", constraints_path) + session.install( + "google-cloud-testutils", + "google-cloud-iam", + "google-cloud-pubsub", + "google-cloud-kms", + "brotli", + "-c", + constraints_path, + ) + + # Run py.test against the system tests. + session.run( + "py.test", + "--quiet", + f"--junitxml=system_{session.python}_sponge_log.xml", + "--reruns={}".format(rerun_count), + os.path.join("tests", "system"), + os.path.join("tests", "resumable_media", "system"), + *session.posargs, + ) + + +@nox.session(python=CONFORMANCE_TEST_PYTHON_VERSIONS) +def conftest_retry(session): + """Run the retry conformance test suite.""" + conformance_test_folder_path = os.path.join("tests", "conformance") + conformance_test_folder_exists = os.path.exists(conformance_test_folder_path) + # Environment check: only run tests if found. + if not conformance_test_folder_exists: + session.skip("Conformance tests were not found") + + # Install all test dependencies and pytest plugin to run tests in parallel. + # Then install this package in-place. + session.install("pytest", "pytest-xdist") + session.install("-e", ".") + + # Run #CPU processes in parallel if no test session arguments are passed in. + if session.posargs: + test_cmd = [ + "py.test", + "--quiet", + conformance_test_folder_path, + *session.posargs, + ] + else: + test_cmd = ["py.test", "-n", "auto", "--quiet", conformance_test_folder_path] + + # Run py.test against the conformance tests. + session.run(*test_cmd) + + +@nox.session(python=DEFAULT_PYTHON_VERSION) +def cover(session): + """Run the final coverage report. + + This outputs the coverage report aggregating coverage from the unit + test runs (not system test runs), and then erases coverage data. + """ + session.install("coverage", "pytest-cov") + session.run("coverage", "report", "--show-missing", "--fail-under=99") + + session.run("coverage", "erase") + + +@nox.session(python="3.10") +def docs(session): + """Build the docs for this library.""" + + session.install("-e", ".") + session.install( + # We need to pin to specific versions of the `sphinxcontrib-*` packages + # which still support sphinx 4.x. + # See https://github.com/googleapis/sphinx-docfx-yaml/issues/344 + # and https://github.com/googleapis/sphinx-docfx-yaml/issues/345. + "sphinxcontrib-applehelp==1.0.4", + "sphinxcontrib-devhelp==1.0.2", + "sphinxcontrib-htmlhelp==2.0.1", + "sphinxcontrib-qthelp==1.0.3", + "sphinxcontrib-serializinghtml==1.1.5", + "sphinx==4.5.0", + "alabaster", + "recommonmark", + ) + + shutil.rmtree(os.path.join("docs", "_build"), ignore_errors=True) + session.run( + "sphinx-build", + "-W", # warnings as errors + "-T", # show full traceback on exception + "-N", # no colors + "-b", + "html", + "-d", + os.path.join("docs", "_build", "doctrees", ""), + os.path.join("docs", ""), + os.path.join("docs", "_build", "html", ""), + ) + + +@nox.session(python="3.10") +def docfx(session): + """Build the docfx yaml files for this library.""" + + session.install("-e", ".") + session.install("grpcio") + session.install( + # We need to pin to specific versions of the `sphinxcontrib-*` packages + # which still support sphinx 4.x. + # See https://github.com/googleapis/sphinx-docfx-yaml/issues/344 + # and https://github.com/googleapis/sphinx-docfx-yaml/issues/345. + "sphinxcontrib-applehelp==1.0.4", + "sphinxcontrib-devhelp==1.0.2", + "sphinxcontrib-htmlhelp==2.0.1", + "sphinxcontrib-qthelp==1.0.3", + "sphinxcontrib-serializinghtml==1.1.5", + "gcp-sphinx-docfx-yaml", + "alabaster", + "recommonmark", + ) + + shutil.rmtree(os.path.join("docs", "_build"), ignore_errors=True) + session.run( + "sphinx-build", + "-T", # show full traceback on exception + "-N", # no colors + "-D", + ( + "extensions=sphinx.ext.autodoc," + "sphinx.ext.autosummary," + "docfx_yaml.extension," + "sphinx.ext.intersphinx," + "sphinx.ext.coverage," + "sphinx.ext.napoleon," + "sphinx.ext.todo," + "sphinx.ext.viewcode," + "recommonmark" + ), + "-b", + "html", + "-d", + os.path.join("docs", "_build", "doctrees", ""), + os.path.join("docs", ""), + os.path.join("docs", "_build", "html", ""), + ) + + +@nox.session(python=UNIT_TEST_PYTHON_VERSIONS[-1]) +@nox.parametrize( + "protobuf_implementation", + ["python", "upb"], +) +def prerelease_deps(session, protobuf_implementation): + """Run all tests with prerelease versions of dependencies installed.""" + + # Install all test dependencies + session.install("mock", "pytest", "pytest-cov", "brotli") + + # Install dependencies needed for system tests + session.install( + "google-cloud-pubsub", + "google-cloud-kms", + "google-cloud-testutils", + "google-cloud-iam", + ) + + # Install all dependencies + session.install("-e", ".[protobuf, tracing]") + + prerel_deps = [ + "google-api-core", + "google-auth", + "google-cloud-core", + "google-crc32c", + "google-resumable-media", + "opentelemetry-api", + "protobuf", + ] + + package_namespaces = { + "google-api-core": "google.api_core", + "google-auth": "google.auth", + "google-cloud-core": "google.cloud.version", + "opentelemetry-api": "opentelemetry.version", + "protobuf": "google.protobuf", + } + + for dep in prerel_deps: + session.install("--pre", "--no-deps", "--upgrade", dep) + print(f"Installed {dep}") + + version_namespace = package_namespaces.get(dep) + + if version_namespace: + session.run( + "python", + "-c", + f"import {version_namespace}; print({version_namespace}.__version__)", + ) + # Remaining dependencies + other_deps = [ + "requests", + ] + session.install(*other_deps) + + session.run( + "py.test", + "tests/unit", + env={ + "PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION": protobuf_implementation, + }, + ) + + session.run( + "py.test", + "--verbose", + f"--junitxml=system_{session.python}_sponge_log.xml", + os.path.join("tests", "system"), + *session.posargs, + env={ + "PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION": protobuf_implementation, + }, + ) diff --git a/.librarian/generator-input/setup.py b/.librarian/generator-input/setup.py new file mode 100644 index 000000000..2c4504749 --- /dev/null +++ b/.librarian/generator-input/setup.py @@ -0,0 +1,107 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io +import os + +import setuptools + + +# Package metadata. + +name = "google-cloud-storage" +description = "Google Cloud Storage API client library" +# Should be one of: +# 'Development Status :: 3 - Alpha' +# 'Development Status :: 4 - Beta' +# 'Development Status :: 5 - Production/Stable' +release_status = "Development Status :: 5 - Production/Stable" +dependencies = [ + "google-auth >= 2.26.1, < 3.0.0", + "google-api-core >= 2.27.0, < 3.0.0", + "google-cloud-core >= 2.4.2, < 3.0.0", + # The dependency "google-resumable-media" is no longer used. However, the + # dependency is still included here to accommodate users who may be + # importing exception classes from the google-resumable-media without + # installing it explicitly. See the python-storage README for details on + # exceptions and importing. Users who are not importing + # google-resumable-media classes in their application can safely disregard + # this dependency. + "google-resumable-media >= 2.7.2, < 3.0.0", + "requests >= 2.22.0, < 3.0.0", + "google-crc32c >= 1.1.3, < 2.0.0", +] +extras = { + "protobuf": ["protobuf >= 3.20.2, < 7.0.0"], + "tracing": [ + "opentelemetry-api >= 1.1.0, < 2.0.0", + ], +} + + +# Setup boilerplate below this line. + +package_root = os.path.abspath(os.path.dirname(__file__)) + +version = {} +with open(os.path.join(package_root, "google/cloud/storage/version.py")) as fp: + exec(fp.read(), version) +version = version["__version__"] + +readme_filename = os.path.join(package_root, "README.rst") +with io.open(readme_filename, encoding="utf-8") as readme_file: + readme = readme_file.read() + +# Only include packages under the 'google' namespace. Do not include tests, +# benchmarks, etc. +packages = [ + package + for package in setuptools.find_namespace_packages() + if package.startswith("google") +] + + +setuptools.setup( + name=name, + version=version, + description=description, + long_description=readme, + author="Google LLC", + author_email="googleapis-packages@google.com", + license="Apache 2.0", + url="https://github.com/googleapis/python-storage", + classifiers=[ + release_status, + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Operating System :: OS Independent", + "Topic :: Internet", + ], + platforms="Posix; MacOS X; Windows", + packages=packages, + install_requires=dependencies, + extras_require=extras, + python_requires=">=3.7", + include_package_data=True, + zip_safe=False, +) diff --git a/.librarian/state.yaml b/.librarian/state.yaml new file mode 100644 index 000000000..1502e804d --- /dev/null +++ b/.librarian/state.yaml @@ -0,0 +1,31 @@ +image: us-central1-docker.pkg.dev/cloud-sdk-librarian-prod/images-prod/python-librarian-generator@sha256:8e2c32496077054105bd06c54a59d6a6694287bc053588e24debe6da6920ad91 +libraries: + - id: google-cloud-storage + version: 3.6.0 + last_generated_commit: 5400ccce473c439885bd6bf2924fd242271bfcab + apis: + - path: google/storage/v2 + service_config: storage_v2.yaml + source_roots: + - . + preserve_regex: [] + remove_regex: + - ^.flake8 + - ^.pre-commit-config.yaml + - ^.trampolinerc + - ^.repo-metadata.json + - ^LICENSE + - ^MANIFEST.in + - ^SECURITY.md + - ^mypy.ini + - ^noxfile.py + - ^renovate.json + - ^setup.py + - ^docs/summary_overview.md + - ^google/cloud/_storage_v2 + - ^samples/generated_samples + - ^testing/constraints-3.8.txt + - ^testing/constraints-3.1.* + - ^tests/__init__.py + - ^tests/unit/gapic + tag_format: v{version} diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ee1c7beb..da1f2149b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,20 @@ [1]: https://pypi.org/project/google-cloud-storage/#history +## [3.7.0](https://github.com/googleapis/python-storage/compare/v3.6.0...v3.7.0) (2025-12-09) + + +### Features + +* Auto enable mTLS when supported certificates are detected ([#1637](https://github.com/googleapis/python-storage/issues/1637)) ([4e91c54](https://github.com/googleapis/python-storage/commit/4e91c541363f0e583bf9dd1b81a95ff2cb618bac)) +* Send entire object checksum in the final api call of resumable upload ([#1654](https://github.com/googleapis/python-storage/issues/1654)) ([ddce7e5](https://github.com/googleapis/python-storage/commit/ddce7e53a13e6c0487221bb14e88161da7ed9e08)) +* Support urllib3 >= 2.6.0 ([#1658](https://github.com/googleapis/python-storage/issues/1658)) ([57405e9](https://github.com/googleapis/python-storage/commit/57405e956a7ca579b20582bf6435cec42743c478)) + + +### Bug Fixes + +* Fix for [move_blob](https://github.com/googleapis/python-storage/blob/57405e956a7ca579b20582bf6435cec42743c478/google/cloud/storage/bucket.py#L2256) failure when the new blob name contains characters that need to be url encoded ([#1605](https://github.com/googleapis/python-storage/issues/1605)) ([ec470a2](https://github.com/googleapis/python-storage/commit/ec470a270e189e137c7229cc359367d5a897cdb9)) + ## [3.6.0](https://github.com/googleapis/python-storage/compare/v3.5.0...v3.6.0) (2025-11-17) diff --git a/cloudbuild/run_zonal_tests.sh b/cloudbuild/run_zonal_tests.sh new file mode 100644 index 000000000..ef94e629b --- /dev/null +++ b/cloudbuild/run_zonal_tests.sh @@ -0,0 +1,26 @@ + +set -euxo pipefail +echo '--- Installing git and cloning repository on VM ---' +sudo apt-get update && sudo apt-get install -y git python3-pip python3-venv + +# Clone the repository and checkout the specific commit from the build trigger. +git clone https://github.com/googleapis/python-storage.git +cd python-storage +git checkout ${COMMIT_SHA} + + +echo '--- Installing Python and dependencies on VM ---' +python3 -m venv env +source env/bin/activate + +echo 'Install testing libraries explicitly, as they are not in setup.py' +pip install --upgrade pip +pip install pytest pytest-timeout pytest-subtests pytest-asyncio +pip install google-cloud-testutils google-cloud-kms +pip install -e . + +echo '--- Setting up environment variables on VM ---' +export ZONAL_BUCKET=${_ZONAL_BUCKET} +export RUN_ZONAL_SYSTEM_TESTS=True +echo '--- Running Zonal tests on VM ---' +pytest -vv -s --log-format='%(asctime)s %(levelname)s %(message)s' --log-date-format='%H:%M:%S' tests/system/test_zonal.py diff --git a/cloudbuild/zb-system-tests-cloudbuild.yaml b/cloudbuild/zb-system-tests-cloudbuild.yaml new file mode 100644 index 000000000..be790ebd4 --- /dev/null +++ b/cloudbuild/zb-system-tests-cloudbuild.yaml @@ -0,0 +1,70 @@ +substitutions: + _REGION: "us-central1" + _ZONE: "us-central1-a" + _SHORT_BUILD_ID: ${BUILD_ID:0:8} + +steps: + + # Step 1 Create a GCE VM to run the tests. + # The VM is created in the same zone as the buckets to test rapid storage features. + # It's given the 'cloud-platform' scope to allow it to access GCS and other services. + - name: "gcr.io/google.com/cloudsdktool/cloud-sdk" + id: "create-vm" + entrypoint: "gcloud" + args: + - "compute" + - "instances" + - "create" + - "gcsfs-test-vm-${_SHORT_BUILD_ID}" + - "--project=${PROJECT_ID}" + - "--zone=${_ZONE}" + - "--machine-type=e2-medium" + - "--image-family=debian-13" + - "--image-project=debian-cloud" + - "--service-account=${_ZONAL_VM_SERVICE_ACCOUNT}" + - "--scopes=https://www.googleapis.com/auth/devstorage.full_control,https://www.googleapis.com/auth/devstorage.read_only,https://www.googleapis.com/auth/devstorage.read_write" + - "--metadata=enable-oslogin=TRUE" + waitFor: ["-"] + + # Step 2: Run the integration tests inside the newly created VM and cleanup. + # This step uses 'gcloud compute ssh' to execute a remote script. + # The VM is deleted after tests are run, regardless of success. + - name: "gcr.io/google.com/cloudsdktool/cloud-sdk" + id: "run-tests-and-delete-vm" + entrypoint: "bash" + args: + - "-c" + - | + set -e + # Wait for the VM to be fully initialized and SSH to be ready. + for i in {1..10}; do + if gcloud compute ssh gcsfs-test-vm-${_SHORT_BUILD_ID} --zone=${_ZONE} --internal-ip --command="echo VM is ready"; then + break + fi + echo "Waiting for VM to become available... (attempt $i/10)" + sleep 15 + done + # copy the script to the VM + gcloud compute scp cloudbuild/run_zonal_tests.sh gcsfs-test-vm-${_SHORT_BUILD_ID}:~ --zone=${_ZONE} --internal-ip + + # Execute the script on the VM via SSH. + # Capture the exit code to ensure cleanup happens before the build fails. + set +e + gcloud compute ssh gcsfs-test-vm-${_SHORT_BUILD_ID} --zone=${_ZONE} --internal-ip --command="COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} bash run_zonal_tests.sh" + EXIT_CODE=$? + set -e + + echo "--- Deleting GCE VM ---" + gcloud compute instances delete "gcsfs-test-vm-${_SHORT_BUILD_ID}" --zone=${_ZONE} --quiet + + # Exit with the original exit code from the test script. + exit $$EXIT_CODE + waitFor: + - "create-vm" + +timeout: "3600s" # 60 minutes + +options: + logging: CLOUD_LOGGING_ONLY + pool: + name: "projects/${PROJECT_ID}/locations/us-central1/workerPools/cloud-build-worker-pool" \ No newline at end of file diff --git a/google/cloud/_storage_v2/gapic_version.py b/google/cloud/_storage_v2/gapic_version.py index 20a9cd975..d69b0530e 100644 --- a/google/cloud/_storage_v2/gapic_version.py +++ b/google/cloud/_storage_v2/gapic_version.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__version__ = "0.0.0" # {x-release-please-version} +__version__ = "3.6.0" # {x-release-please-version} diff --git a/google/cloud/_storage_v2/services/storage/client.py b/google/cloud/_storage_v2/services/storage/client.py index 16c76a01f..cdccf3fab 100644 --- a/google/cloud/_storage_v2/services/storage/client.py +++ b/google/cloud/_storage_v2/services/storage/client.py @@ -184,6 +184,34 @@ def _get_default_mtls_endpoint(api_endpoint): _DEFAULT_ENDPOINT_TEMPLATE = "storage.{UNIVERSE_DOMAIN}" _DEFAULT_UNIVERSE = "googleapis.com" + @staticmethod + def _use_client_cert_effective(): + """Returns whether client certificate should be used for mTLS if the + google-auth version supports should_use_client_cert automatic mTLS enablement. + + Alternatively, read from the GOOGLE_API_USE_CLIENT_CERTIFICATE env var. + + Returns: + bool: whether client certificate should be used for mTLS + Raises: + ValueError: (If using a version of google-auth without should_use_client_cert and + GOOGLE_API_USE_CLIENT_CERTIFICATE is set to an unexpected value.) + """ + # check if google-auth version supports should_use_client_cert for automatic mTLS enablement + if hasattr(mtls, "should_use_client_cert"): + return mtls.should_use_client_cert() + else: + # if unsupported, fallback to reading from env var + use_client_cert_str = os.getenv( + "GOOGLE_API_USE_CLIENT_CERTIFICATE", "false" + ).lower() + if use_client_cert_str not in ("true", "false"): + raise ValueError( + "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be" + " either `true` or `false`" + ) + return use_client_cert_str == "true" + @classmethod def from_service_account_info(cls, info: dict, *args, **kwargs): """Creates an instance of this client using the provided credentials @@ -390,12 +418,8 @@ def get_mtls_endpoint_and_cert_source( ) if client_options is None: client_options = client_options_lib.ClientOptions() - use_client_cert = os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false") + use_client_cert = StorageClient._use_client_cert_effective() use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto") - if use_client_cert not in ("true", "false"): - raise ValueError( - "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" - ) if use_mtls_endpoint not in ("auto", "never", "always"): raise MutualTLSChannelError( "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" @@ -403,7 +427,7 @@ def get_mtls_endpoint_and_cert_source( # Figure out the client cert source to use. client_cert_source = None - if use_client_cert == "true": + if use_client_cert: if client_options.client_cert_source: client_cert_source = client_options.client_cert_source elif mtls.has_default_client_cert_source(): @@ -435,20 +459,14 @@ def _read_environment_variables(): google.auth.exceptions.MutualTLSChannelError: If GOOGLE_API_USE_MTLS_ENDPOINT is not any of ["auto", "never", "always"]. """ - use_client_cert = os.getenv( - "GOOGLE_API_USE_CLIENT_CERTIFICATE", "false" - ).lower() + use_client_cert = StorageClient._use_client_cert_effective() use_mtls_endpoint = os.getenv("GOOGLE_API_USE_MTLS_ENDPOINT", "auto").lower() universe_domain_env = os.getenv("GOOGLE_CLOUD_UNIVERSE_DOMAIN") - if use_client_cert not in ("true", "false"): - raise ValueError( - "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" - ) if use_mtls_endpoint not in ("auto", "never", "always"): raise MutualTLSChannelError( "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" ) - return use_client_cert == "true", use_mtls_endpoint, universe_domain_env + return use_client_cert, use_mtls_endpoint, universe_domain_env @staticmethod def _get_client_cert_source(provided_cert_source, use_cert_flag): diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py new file mode 100644 index 000000000..d34c844d5 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -0,0 +1,316 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +NOTE: +This is _experimental module for upcoming support for Rapid Storage. +(https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new) + +APIs may not work as intended and are not stable yet. Feature is not +GA(Generally Available) yet, please contact your TAM (Technical Account Manager) +if you want to use these Rapid Storage APIs. + +""" +from typing import Optional, Union +from google.cloud import _storage_v2 +from google.cloud.storage._experimental.asyncio.async_grpc_client import ( + AsyncGrpcClient, +) +from google.cloud.storage._experimental.asyncio.async_write_object_stream import ( + _AsyncWriteObjectStream, +) + + +_MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB +_MAX_BUFFER_SIZE_BYTES = 16 * 1024 * 1024 # 16 MiB + + +class AsyncAppendableObjectWriter: + """Class for appending data to a GCS Appendable Object asynchronously.""" + + def __init__( + self, + client: AsyncGrpcClient.grpc_client, + bucket_name: str, + object_name: str, + generation=None, + write_handle=None, + ): + """ + Class for appending data to a GCS Appendable Object. + + Example usage: + + ``` + + from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import AsyncAppendableObjectWriter + import asyncio + + client = AsyncGrpcClient().grpc_client + bucket_name = "my-bucket" + object_name = "my-appendable-object" + + # instantiate the writer + writer = AsyncAppendableObjectWriter(client, bucket_name, object_name) + # open the writer, (underlying gRPC bidi-stream will be opened) + await writer.open() + + # append data, it can be called multiple times. + await writer.append(b"hello world") + await writer.append(b"some more data") + + # optionally flush data to persist. + await writer.flush() + + # close the gRPC stream. + # Please note closing the program will also close the stream, + # however it's recommended to close the stream if no more data to append + # to clean up gRPC connection (which means CPU/memory/network resources) + await writer.close() + ``` + + :type client: :class:`~google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client` + :param client: async grpc client to use for making API requests. + + :type bucket_name: str + :param bucket_name: The name of the GCS bucket containing the object. + + :type object_name: str + :param object_name: The name of the GCS Appendable Object to be written. + + :type generation: int + :param generation: (Optional) If present, selects a specific revision of + that object. + If None, a new object is created. + If None and Object already exists then it'll will be + overwritten. + + :type write_handle: bytes + :param write_handle: (Optional) An existing handle for writing the object. + If provided, opening the bidi-gRPC connection will be faster. + """ + self.client = client + self.bucket_name = bucket_name + self.object_name = object_name + self.write_handle = write_handle + self.generation = generation + + self.write_obj_stream = _AsyncWriteObjectStream( + client=self.client, + bucket_name=self.bucket_name, + object_name=self.object_name, + generation_number=self.generation, + write_handle=self.write_handle, + ) + self._is_stream_open: bool = False + self.offset: Optional[int] = None + self.persisted_size: Optional[int] = None + + async def state_lookup(self) -> int: + """Returns the persisted_size + + :rtype: int + :returns: persisted size. + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). + """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before state_lookup().") + + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + state_lookup=True, + ) + ) + response = await self.write_obj_stream.recv() + self.persisted_size = response.persisted_size + return self.persisted_size + + async def open(self) -> None: + """Opens the underlying bidi-gRPC stream. + + :raises ValueError: If the stream is already open. + + """ + if self._is_stream_open: + raise ValueError("Underlying bidi-gRPC stream is already open") + + await self.write_obj_stream.open() + self._is_stream_open = True + if self.generation is None: + self.generation = self.write_obj_stream.generation_number + self.write_handle = self.write_obj_stream.write_handle + + # Update self.persisted_size + _ = await self.state_lookup() + + async def append(self, data: bytes) -> None: + """Appends data to the Appendable object. + + This method sends the provided data to the GCS server in chunks. It + maintains an internal threshold `_MAX_BUFFER_SIZE_BYTES` and will + automatically flush the data to make it visible to readers when that + threshold has reached. + + :type data: bytes + :param data: The bytes to append to the object. + + :rtype: None + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). + """ + + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before append().") + total_bytes = len(data) + if total_bytes == 0: + # TODO: add warning. + return + if self.offset is None: + assert self.persisted_size is not None + self.offset = self.persisted_size + + start_idx = 0 + bytes_to_flush = 0 + while start_idx < total_bytes: + end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes) + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + write_offset=self.offset, + checksummed_data=_storage_v2.ChecksummedData( + content=data[start_idx:end_idx] + ), + ) + ) + chunk_size = end_idx - start_idx + self.offset += chunk_size + bytes_to_flush += chunk_size + if bytes_to_flush >= _MAX_BUFFER_SIZE_BYTES: + await self.simple_flush() + bytes_to_flush = 0 + start_idx = end_idx + + async def simple_flush(self) -> None: + """Flushes the data to the server. + Please note: Unlike `flush` it does not do `state_lookup` + + :rtype: None + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). + """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before simple_flush().") + + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + flush=True, + ) + ) + + async def flush(self) -> int: + """Flushes the data to the server. + + :rtype: int + :returns: The persisted size after flush. + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). + """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before flush().") + + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + flush=True, + state_lookup=True, + ) + ) + response = await self.write_obj_stream.recv() + self.persisted_size = response.persisted_size + self.offset = self.persisted_size + return self.persisted_size + + async def close(self, finalize_on_close=False) -> Union[int, _storage_v2.Object]: + """Closes the underlying bidi-gRPC stream. + + :type finalize_on_close: bool + :param finalize_on_close: Finalizes the Appendable Object. No more data + can be appended. + + rtype: Union[int, _storage_v2.Object] + returns: Updated `self.persisted_size` by default after closing the + bidi-gRPC stream. However, if `finalize_on_close=True` is passed, + returns the finalized object resource. + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). + + """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before close().") + + if finalize_on_close: + await self.finalize() + else: + await self.flush() + await self.write_obj_stream.close() + + self._is_stream_open = False + self.offset = None + return self.object_resource if finalize_on_close else self.persisted_size + + async def finalize(self) -> _storage_v2.Object: + """Finalizes the Appendable Object. + + Note: Once finalized no more data can be appended. + + rtype: google.cloud.storage_v2.types.Object + returns: The finalized object resource. + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). + """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before finalize().") + + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest(finish_write=True) + ) + response = await self.write_obj_stream.recv() + self.object_resource = response.resource + self.persisted_size = self.object_resource.size + return self.object_resource + + # helper methods. + async def append_from_string(self, data: str): + """ + str data will be encoded to bytes using utf-8 encoding calling + + self.append(data.encode("utf-8")) + """ + raise NotImplementedError("append_from_string is not implemented yet.") + + async def append_from_stream(self, stream_obj): + """ + At a time read a chunk of data (16MiB) from `stream_obj` + and call self.append(chunk) + """ + raise NotImplementedError("append_from_stream is not implemented yet.") + + async def append_from_file(self, file_path: str): + """Create a file object from `file_path` and call append_from_stream(file_obj)""" + raise NotImplementedError("append_from_file is not implemented yet.") diff --git a/google/cloud/storage/_experimental/asyncio/async_grpc_client.py b/google/cloud/storage/_experimental/asyncio/async_grpc_client.py index 75e6f63d2..a5cccca59 100644 --- a/google/cloud/storage/_experimental/asyncio/async_grpc_client.py +++ b/google/cloud/storage/_experimental/asyncio/async_grpc_client.py @@ -65,8 +65,10 @@ def _create_async_grpc_client( transport_cls = storage_v2.StorageAsyncClient.get_transport_class( "grpc_asyncio" ) - channel = transport_cls.create_channel(attempt_direct_path=attempt_direct_path) - transport = transport_cls(credentials=credentials, channel=channel) + channel = transport_cls.create_channel( + attempt_direct_path=attempt_direct_path, credentials=credentials + ) + transport = transport_cls(channel=channel) return storage_v2.StorageAsyncClient( transport=transport, diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index 07263ddd8..6d1fd5b31 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -95,15 +95,60 @@ async def open(self) -> None: """Opening an object for write , should do it's state lookup to know what's the persisted size is. """ - raise NotImplementedError( - "open() is not implemented yet in _AsyncWriteObjectStream" + if self._is_stream_open: + raise ValueError("Stream is already open") + + # Create a new object or overwrite existing one if generation_number + # is None. This makes it consistent with GCS JSON API behavior. + # Created object type would be Appendable Object. + if self.generation_number is None: + self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest( + write_object_spec=_storage_v2.WriteObjectSpec( + resource=_storage_v2.Object( + name=self.object_name, bucket=self._full_bucket_name + ), + appendable=True, + ), + ) + else: + self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest( + append_object_spec=_storage_v2.AppendObjectSpec( + bucket=self._full_bucket_name, + object=self.object_name, + generation=self.generation_number, + ), + state_lookup=True, + ) + + self.socket_like_rpc = AsyncBidiRpc( + self.rpc, initial_request=self.first_bidi_write_req, metadata=self.metadata ) + await self.socket_like_rpc.open() # this is actually 1 send + response = await self.socket_like_rpc.recv() + self._is_stream_open = True + + if not response.resource: + raise ValueError( + "Failed to obtain object resource after opening the stream" + ) + if not response.resource.generation: + raise ValueError( + "Failed to obtain object generation after opening the stream" + ) + self.generation_number = response.resource.generation + + if not response.write_handle: + raise ValueError("Failed to obtain write_handle after opening the stream") + + self.write_handle = response.write_handle + async def close(self) -> None: """Closes the bidi-gRPC connection.""" - raise NotImplementedError( - "close() is not implemented yet in _AsyncWriteObjectStream" - ) + if not self._is_stream_open: + raise ValueError("Stream is not open") + await self.socket_like_rpc.close() + self._is_stream_open = False async def send( self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest @@ -115,9 +160,9 @@ async def send( The request message to send. This is typically used to specify the read offset and limit. """ - raise NotImplementedError( - "send() is not implemented yet in _AsyncWriteObjectStream" - ) + if not self._is_stream_open: + raise ValueError("Stream is not open") + await self.socket_like_rpc.send(bidi_write_object_request) async def recv(self) -> _storage_v2.BidiWriteObjectResponse: """Receives a response from the stream. @@ -129,6 +174,10 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse: :class:`~google.cloud._storage_v2.types.BidiWriteObjectResponse`: The response message from the server. """ - raise NotImplementedError( - "recv() is not implemented yet in _AsyncWriteObjectStream" - ) + if not self._is_stream_open: + raise ValueError("Stream is not open") + return await self.socket_like_rpc.recv() + + @property + def is_stream_open(self) -> bool: + return self._is_stream_open diff --git a/google/cloud/storage/_helpers.py b/google/cloud/storage/_helpers.py index 682f8784d..24f72ad71 100644 --- a/google/cloud/storage/_helpers.py +++ b/google/cloud/storage/_helpers.py @@ -111,10 +111,6 @@ def _virtual_hosted_style_base_url(url, bucket, trailing_slash=False): return base_url -def _use_client_cert(): - return os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE") == "true" - - def _get_environ_project(): return os.getenv( environment_vars.PROJECT, diff --git a/google/cloud/storage/_media/_upload.py b/google/cloud/storage/_media/_upload.py index 765716882..4a919d18a 100644 --- a/google/cloud/storage/_media/_upload.py +++ b/google/cloud/storage/_media/_upload.py @@ -688,6 +688,13 @@ def _prepare_request(self): _CONTENT_TYPE_HEADER: self._content_type, _helpers.CONTENT_RANGE_HEADER: content_range, } + if (start_byte + len(payload) == self._total_bytes) and ( + self._checksum_object is not None + ): + local_checksum = _helpers.prepare_checksum_digest( + self._checksum_object.digest() + ) + headers["x-goog-hash"] = f"{self._checksum_type}={local_checksum}" return _PUT, self.resumable_url, payload, headers def _update_checksum(self, start_byte, payload): diff --git a/google/cloud/storage/_media/requests/download.py b/google/cloud/storage/_media/requests/download.py index b8e2758e1..13e049bd3 100644 --- a/google/cloud/storage/_media/requests/download.py +++ b/google/cloud/storage/_media/requests/download.py @@ -711,7 +711,7 @@ def __init__(self, checksum): super().__init__() self._checksum = checksum - def decompress(self, data): + def decompress(self, data, max_length=-1): """Decompress the bytes. Args: @@ -721,7 +721,11 @@ def decompress(self, data): bytes: The decompressed bytes from ``data``. """ self._checksum.update(data) - return super().decompress(data) + try: + return super().decompress(data, max_length=max_length) + except TypeError: + # Fallback for urllib3 < 2.6.0 which lacks `max_length` support. + return super().decompress(data) # urllib3.response.BrotliDecoder might not exist depending on whether brotli is @@ -747,7 +751,7 @@ def __init__(self, checksum): self._decoder = urllib3.response.BrotliDecoder() self._checksum = checksum - def decompress(self, data): + def decompress(self, data, max_length=-1): """Decompress the bytes. Args: @@ -757,10 +761,19 @@ def decompress(self, data): bytes: The decompressed bytes from ``data``. """ self._checksum.update(data) - return self._decoder.decompress(data) + try: + return self._decoder.decompress(data, max_length=max_length) + except TypeError: + # Fallback for urllib3 < 2.6.0 which lacks `max_length` support. + return self._decoder.decompress(data) def flush(self): return self._decoder.flush() + @property + def has_unconsumed_tail(self) -> bool: + return self._decoder.has_unconsumed_tail + + else: # pragma: NO COVER _BrotliDecoder = None # type: ignore # pragma: NO COVER diff --git a/google/cloud/storage/bucket.py b/google/cloud/storage/bucket.py index 0d1f9192b..1621f879e 100644 --- a/google/cloud/storage/bucket.py +++ b/google/cloud/storage/bucket.py @@ -41,6 +41,7 @@ from google.cloud.storage._opentelemetry_tracing import create_trace_span from google.cloud.storage.acl import BucketACL from google.cloud.storage.acl import DefaultObjectACL +from google.cloud.storage.blob import _quote from google.cloud.storage.blob import Blob from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.constants import ARCHIVE_STORAGE_CLASS @@ -2360,7 +2361,10 @@ def move_blob( ) new_blob = Blob(bucket=self, name=new_name) - api_path = blob.path + "/moveTo/o/" + new_blob.name + api_path = "{blob_path}/moveTo/o/{new_name}".format( + blob_path=blob.path, new_name=_quote(new_blob.name) + ) + move_result = client._post_resource( api_path, None, diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index 3764c7a53..85575f067 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -20,11 +20,12 @@ import datetime import functools import json +import os import warnings import google.api_core.client_options from google.auth.credentials import AnonymousCredentials - +from google.auth.transport import mtls from google.api_core import page_iterator from google.cloud._helpers import _LocalStack from google.cloud.client import ClientWithProject @@ -35,7 +36,6 @@ from google.cloud.storage._helpers import _get_api_endpoint_override from google.cloud.storage._helpers import _get_environ_project from google.cloud.storage._helpers import _get_storage_emulator_override -from google.cloud.storage._helpers import _use_client_cert from google.cloud.storage._helpers import _virtual_hosted_style_base_url from google.cloud.storage._helpers import _DEFAULT_UNIVERSE_DOMAIN from google.cloud.storage._helpers import _DEFAULT_SCHEME @@ -218,7 +218,15 @@ def __init__( # The final decision of whether to use mTLS takes place in # google-auth-library-python. We peek at the environment variable # here only to issue an exception in case of a conflict. - if _use_client_cert(): + use_client_cert = False + if hasattr(mtls, "should_use_client_cert"): + use_client_cert = mtls.should_use_client_cert() + else: + use_client_cert = ( + os.getenv("GOOGLE_API_USE_CLIENT_CERTIFICATE") == "true" + ) + + if use_client_cert: raise ValueError( 'The "GOOGLE_API_USE_CLIENT_CERTIFICATE" env variable is ' 'set to "true" and a non-default universe domain is ' diff --git a/google/cloud/storage/version.py b/google/cloud/storage/version.py index 102b96095..dc87b3c5b 100644 --- a/google/cloud/storage/version.py +++ b/google/cloud/storage/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "3.6.0" +__version__ = "3.7.0" diff --git a/noxfile.py b/noxfile.py index 451fced3e..b89b9d319 100644 --- a/noxfile.py +++ b/noxfile.py @@ -74,9 +74,7 @@ def lint(session): session.run("flake8", "google", "tests") -# Use a python runtime which is available in the owlbot post processor here -# https://github.com/googleapis/synthtool/blob/master/docker/owlbot/python/Dockerfile -@nox.session(python=["3.10", DEFAULT_PYTHON_VERSION]) +@nox.session(python="3.14") def blacken(session): """Run black. @@ -234,7 +232,7 @@ def conftest_retry(session): test_cmd = ["py.test", "-n", "auto", "--quiet", conformance_test_folder_path] # Run py.test against the conformance tests. - session.run(*test_cmd) + session.run(*test_cmd, env={"DOCKER_API_VERSION": "1.39"}) @nox.session(python=DEFAULT_PYTHON_VERSION) diff --git a/samples/generated_samples/snippet_metadata_google.storage.v2.json b/samples/generated_samples/snippet_metadata_google.storage.v2.json index b2448bff2..4af7ef641 100644 --- a/samples/generated_samples/snippet_metadata_google.storage.v2.json +++ b/samples/generated_samples/snippet_metadata_google.storage.v2.json @@ -8,7 +8,7 @@ ], "language": "PYTHON", "name": "google-cloud-storage", - "version": "0.0.0" + "version": "3.6.0" }, "snippets": [ { diff --git a/samples/snippets/storage_list_buckets_partial_success.py b/samples/snippets/storage_list_buckets_partial_success.py new file mode 100644 index 000000000..bea4c9ed3 --- /dev/null +++ b/samples/snippets/storage_list_buckets_partial_success.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python + +# Copyright 2025 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START storage_list_buckets_partial_success] +from google.cloud import storage + + +def list_buckets_with_partial_success(): + """Lists buckets and includes unreachable buckets in the response.""" + + storage_client = storage.Client() + + buckets_iterator = storage_client.list_buckets(return_partial_success=True) + + for page in buckets_iterator.pages: + if page.unreachable: + print("Unreachable locations in this page:") + for location in page.unreachable: + print(location) + + print("Reachable buckets in this page:") + for bucket in page: + print(bucket.name) + + +# [END storage_list_buckets_partial_success] + + +if __name__ == "__main__": + list_buckets_with_partial_success() diff --git a/setup.py b/setup.py index 2c4504749..374a71cf4 100644 --- a/setup.py +++ b/setup.py @@ -43,6 +43,18 @@ "google-crc32c >= 1.1.3, < 2.0.0", ] extras = { + # TODO: Make these extra dependencies as mandatory once gRPC out of + # experimental in this SDK. More info in b/465352227 + "grpc": [ + "google-api-core[grpc] >= 2.27.0, < 3.0.0", + "grpcio >= 1.33.2, < 2.0.0; python_version < '3.14'", + "grpcio >= 1.75.1, < 2.0.0; python_version >= '3.14'", + "grpcio-status >= 1.76.0, < 2.0.0", + "proto-plus >= 1.22.3, <2.0.0; python_version < '3.13'", + "proto-plus >= 1.25.0, <2.0.0; python_version >= '3.13'", + "protobuf>=3.20.2,<7.0.0,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5", + "grpc-google-iam-v1 >= 0.14.0, <1.0.0", + ], "protobuf": ["protobuf >= 3.20.2, < 7.0.0"], "tracing": [ "opentelemetry-api >= 1.1.0, < 2.0.0", diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 9c17b387b..151762409 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -5,7 +5,7 @@ # e.g., if setup.py has "google-cloud-foo >= 1.14.0, < 2.0.0", # Then this file should have google-cloud-foo==1.14.0 google-auth==2.26.1 -google-api-core==2.15.0 +google-api-core==2.27.0 google-cloud-core==2.4.2 google-resumable-media==2.7.2 requests==2.22.0 diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index ccf6c1493..f022e9e1c 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -5,7 +5,7 @@ # e.g., if setup.py has "google-cloud-foo >= 1.14.0, < 2.0.0", # Then this file should have google-cloud-foo==1.14.0 google-auth==2.26.1 -google-api-core>=2.15.0 +google-api-core==2.27.0 google-cloud-core==2.4.2 google-resumable-media==2.7.2 requests==2.22.0 diff --git a/tests/resumable_media/system/requests/test_upload.py b/tests/resumable_media/system/requests/test_upload.py index dd90aa53b..47f4f6003 100644 --- a/tests/resumable_media/system/requests/test_upload.py +++ b/tests/resumable_media/system/requests/test_upload.py @@ -27,7 +27,6 @@ import google.cloud.storage._media.requests as resumable_requests from google.cloud.storage._media import _helpers from .. import utils -from google.cloud.storage._media import _upload from google.cloud.storage.exceptions import InvalidResponse from google.cloud.storage.exceptions import DataCorruption @@ -372,29 +371,6 @@ def test_resumable_upload_with_headers( _resumable_upload_helper(authorized_transport, img_stream, cleanup, headers=headers) -@pytest.mark.parametrize("checksum", ["md5", "crc32c"]) -def test_resumable_upload_with_bad_checksum( - authorized_transport, img_stream, bucket, cleanup, checksum -): - fake_checksum_object = _helpers._get_checksum_object(checksum) - fake_checksum_object.update(b"bad data") - fake_prepared_checksum_digest = _helpers.prepare_checksum_digest( - fake_checksum_object.digest() - ) - with mock.patch.object( - _helpers, "prepare_checksum_digest", return_value=fake_prepared_checksum_digest - ): - with pytest.raises(DataCorruption) as exc_info: - _resumable_upload_helper( - authorized_transport, img_stream, cleanup, checksum=checksum - ) - expected_checksums = {"md5": "1bsd83IYNug8hd+V1ING3Q==", "crc32c": "YQGPxA=="} - expected_message = _upload._UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format( - checksum.upper(), fake_prepared_checksum_digest, expected_checksums[checksum] - ) - assert exc_info.value.args[0] == expected_message - - def test_resumable_upload_bad_chunk_size(authorized_transport, img_stream): blob_name = os.path.basename(img_stream.name) # Create the actual upload object. diff --git a/tests/system/test_bucket.py b/tests/system/test_bucket.py index 602e407cc..32806bd4c 100644 --- a/tests/system/test_bucket.py +++ b/tests/system/test_bucket.py @@ -472,6 +472,36 @@ def test_bucket_move_blob_hns( assert source_gen != dest.generation +def test_bucket_move_blob_with_name_needs_encoding( + storage_client, + buckets_to_delete, + blobs_to_delete, +): + payload = b"move_blob_with_name_which_has_a_char_that_needs_url_encoding" + + bucket_name = _helpers.unique_name("move-blob") + bucket_obj = storage_client.bucket(bucket_name) + created = _helpers.retry_429_503(storage_client.create_bucket)(bucket_obj) + buckets_to_delete.append(created) + + source = created.blob("source") + source_gen = source.generation + source.upload_from_string(payload) + blobs_to_delete.append(source) + + dest = created.move_blob( + source, + "dest/dest_file.txt", + if_source_generation_match=source.generation, + if_source_metageneration_match=source.metageneration, + ) + blobs_to_delete.append(dest) + + assert dest.download_as_bytes() == payload + assert dest.generation is not None + assert source_gen != dest.generation + + def test_bucket_get_blob_with_user_project( storage_client, buckets_to_delete, @@ -1339,6 +1369,7 @@ def test_bucket_ip_filter_patch(storage_client, buckets_to_delete): assert len(reloaded_filter.vpc_network_sources) == 1 +@pytest.mark.skip(reason="[https://github.com/googleapis/python-storage/issues/1611]") def test_list_buckets_with_ip_filter(storage_client, buckets_to_delete): """Test that listing buckets returns a summarized IP filter.""" bucket_name = _helpers.unique_name("ip-filter-list") diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py new file mode 100644 index 000000000..909b9ddf1 --- /dev/null +++ b/tests/system/test_zonal.py @@ -0,0 +1,59 @@ +# py standard imports +import os +import uuid +from io import BytesIO + +# python additional imports +import pytest + +# current library imports +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + +pytestmark = pytest.mark.skipif( + os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True", + reason="Zonal system tests need to be explicitly enabled. This helps scheduling tests in Kokoro and Cloud Build.", +) + + +# TODO: replace this with a fixture once zonal bucket creation / deletion +# is supported in grpc client or json client client. +_ZONAL_BUCKET = os.getenv("ZONAL_BUCKET") + + +@pytest.mark.asyncio +async def test_basic_wrd(storage_client, blobs_to_delete): + bytes_to_upload = b"dummy_bytes_to_write_read_and_delete_appendable_object" + object_name = f"test_basic_wrd-{str(uuid.uuid4())}" + + # Client instantiation; it cannot be part of fixture because. + # grpc_client's event loop and event loop of coroutine running it + # (i.e. this test) must be same. + # Note: + # 1. @pytest.mark.asyncio ensures new event for each test. + # 2. we can keep the same event loop for entire module but that may + # create issues if tests are run in parallel and one test hogs the event + # loop slowing down other tests. + grpc_client = AsyncGrpcClient().grpc_client + + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + await writer.append(bytes_to_upload) + object_metadata = await writer.close(finalize_on_close=True) + assert object_metadata.size == len(bytes_to_upload) + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + assert buffer.getvalue() == bytes_to_upload + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py new file mode 100644 index 000000000..a75824f8b --- /dev/null +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -0,0 +1,498 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from unittest import mock + +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) +from google.cloud import _storage_v2 + + +BUCKET = "test-bucket" +OBJECT = "test-object" +GENERATION = 123 +WRITE_HANDLE = b"test-write-handle" +PERSISTED_SIZE = 456 + + +@pytest.fixture +def mock_client(): + """Mock the async gRPC client.""" + return mock.AsyncMock() + + +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +def test_init(mock_write_object_stream, mock_client): + """Test the constructor of AsyncAppendableObjectWriter.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + + assert writer.client == mock_client + assert writer.bucket_name == BUCKET + assert writer.object_name == OBJECT + assert writer.generation is None + assert writer.write_handle is None + assert not writer._is_stream_open + assert writer.offset is None + assert writer.persisted_size is None + + mock_write_object_stream.assert_called_once_with( + client=mock_client, + bucket_name=BUCKET, + object_name=OBJECT, + generation_number=None, + write_handle=None, + ) + assert writer.write_obj_stream == mock_write_object_stream.return_value + + +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +def test_init_with_optional_args(mock_write_object_stream, mock_client): + """Test the constructor with optional arguments.""" + writer = AsyncAppendableObjectWriter( + mock_client, + BUCKET, + OBJECT, + generation=GENERATION, + write_handle=WRITE_HANDLE, + ) + + assert writer.generation == GENERATION + assert writer.write_handle == WRITE_HANDLE + + mock_write_object_stream.assert_called_once_with( + client=mock_client, + bucket_name=BUCKET, + object_name=OBJECT, + generation_number=GENERATION, + write_handle=WRITE_HANDLE, + ) + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_state_lookup(mock_write_object_stream, mock_client): + """Test state_lookup method.""" + # Arrange + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=PERSISTED_SIZE) + ) + + expected_request = _storage_v2.BidiWriteObjectRequest(state_lookup=True) + + # Act + response = await writer.state_lookup() + + # Assert + mock_stream.send.assert_awaited_once_with(expected_request) + mock_stream.recv.assert_awaited_once() + assert writer.persisted_size == PERSISTED_SIZE + assert response == PERSISTED_SIZE + + +@pytest.mark.asyncio +async def test_state_lookup_without_open_raises_value_error(mock_client): + """Test that state_lookup raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, + match="Stream is not open. Call open\\(\\) before state_lookup\\(\\).", + ): + await writer.state_lookup() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_open_appendable_object_writer(mock_write_object_stream, mock_client): + """Test the open method.""" + # Arrange + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + mock_stream = mock_write_object_stream.return_value + mock_stream.open = mock.AsyncMock() + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock() + + mock_state_response = mock.MagicMock() + mock_state_response.persisted_size = 1024 + mock_stream.recv.return_value = mock_state_response + + mock_stream.generation_number = GENERATION + mock_stream.write_handle = WRITE_HANDLE + + # Act + await writer.open() + + # Assert + mock_stream.open.assert_awaited_once() + assert writer._is_stream_open + assert writer.generation == GENERATION + assert writer.write_handle == WRITE_HANDLE + + expected_request = _storage_v2.BidiWriteObjectRequest(state_lookup=True) + mock_stream.send.assert_awaited_once_with(expected_request) + mock_stream.recv.assert_awaited_once() + assert writer.persisted_size == 1024 + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_open_when_already_open_raises_error( + mock_write_object_stream, mock_client +): + """Test that opening an already open writer raises a ValueError.""" + # Arrange + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True # Manually set to open + + # Act & Assert + with pytest.raises(ValueError, match="Underlying bidi-gRPC stream is already open"): + await writer.open() + + +@pytest.mark.asyncio +async def test_unimplemented_methods_raise_error(mock_client): + """Test that all currently unimplemented methods raise NotImplementedError.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + + with pytest.raises(NotImplementedError): + await writer.append_from_string("data") + + with pytest.raises(NotImplementedError): + await writer.append_from_stream(mock.Mock()) + + with pytest.raises(NotImplementedError): + await writer.append_from_file("file.txt") + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_flush(mock_write_object_stream, mock_client): + """Test that flush sends the correct request and updates state.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=1024) + ) + + persisted_size = await writer.flush() + + expected_request = _storage_v2.BidiWriteObjectRequest(flush=True, state_lookup=True) + mock_stream.send.assert_awaited_once_with(expected_request) + mock_stream.recv.assert_awaited_once() + assert writer.persisted_size == 1024 + assert writer.offset == 1024 + assert persisted_size == 1024 + + +@pytest.mark.asyncio +async def test_flush_without_open_raises_value_error(mock_client): + """Test that flush raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Stream is not open. Call open\\(\\) before flush\\(\\)." + ): + await writer.flush() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_simple_flush(mock_write_object_stream, mock_client): + """Test that flush sends the correct request and updates state.""" + # Arrange + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + + # Act + await writer.simple_flush() + + # Assert + mock_stream.send.assert_awaited_once_with( + _storage_v2.BidiWriteObjectRequest(flush=True) + ) + + +@pytest.mark.asyncio +async def test_simple_flush_without_open_raises_value_error(mock_client): + """Test that flush raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, + match="Stream is not open. Call open\\(\\) before simple_flush\\(\\).", + ): + await writer.simple_flush() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_close(mock_write_object_stream, mock_client): + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.offset = 1024 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=1024) + ) + mock_stream.close = mock.AsyncMock() + writer.finalize = mock.AsyncMock() + + persisted_size = await writer.close() + + writer.finalize.assert_not_awaited() + mock_stream.close.assert_awaited_once() + assert writer.offset is None + assert persisted_size == 1024 + assert not writer._is_stream_open + + +@pytest.mark.asyncio +async def test_close_without_open_raises_value_error(mock_client): + """Test that close raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Stream is not open. Call open\\(\\) before close\\(\\)." + ): + await writer.close() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_finalize_on_close(mock_write_object_stream, mock_client): + """Test close with finalizing.""" + # Arrange + mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET, size=2048) + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.offset = 1024 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource) + ) + mock_stream.close = mock.AsyncMock() + + # Act + result = await writer.close(finalize_on_close=True) + + # Assert + mock_stream.close.assert_not_awaited() # Based on new implementation + assert not writer._is_stream_open + assert writer.offset is None + assert writer.object_resource == mock_resource + assert writer.persisted_size == 2048 + assert result == mock_resource + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_finalize(mock_write_object_stream, mock_client): + """Test that finalize sends the correct request and updates state.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET, size=123) + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource) + ) + + gcs_object = await writer.finalize() + + mock_stream.send.assert_awaited_once_with( + _storage_v2.BidiWriteObjectRequest(finish_write=True) + ) + mock_stream.recv.assert_awaited_once() + assert writer.object_resource == mock_resource + assert writer.persisted_size == 123 + assert gcs_object == mock_resource + + +@pytest.mark.asyncio +async def test_finalize_without_open_raises_value_error(mock_client): + """Test that finalize raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Stream is not open. Call open\\(\\) before finalize\\(\\)." + ): + await writer.finalize() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_raises_error_if_not_open(mock_write_object_stream, mock_client): + """Test that append raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Stream is not open. Call open\\(\\) before append\\(\\)." + ): + await writer.append(b"some data") + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_with_empty_data(mock_write_object_stream, mock_client): + """Test that append does nothing if data is empty.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + + await writer.append(b"") + + mock_stream.send.assert_not_awaited() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client): + """Test that append sends data in chunks and updates offset.""" + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _MAX_CHUNK_SIZE_BYTES, + ) + + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.persisted_size = 100 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + writer.simple_flush = mock.AsyncMock() + + data = b"a" * (_MAX_CHUNK_SIZE_BYTES + 1) + await writer.append(data) + + assert mock_stream.send.await_count == 2 + first_call = mock_stream.send.await_args_list[0] + second_call = mock_stream.send.await_args_list[1] + + # First chunk + assert first_call[0][0].write_offset == 100 + assert len(first_call[0][0].checksummed_data.content) == _MAX_CHUNK_SIZE_BYTES + + # Second chunk + assert second_call[0][0].write_offset == 100 + _MAX_CHUNK_SIZE_BYTES + assert len(second_call[0][0].checksummed_data.content) == 1 + + assert writer.offset == 100 + len(data) + writer.simple_flush.assert_not_awaited() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_flushes_when_buffer_is_full( + mock_write_object_stream, mock_client +): + """Test that append flushes the stream when the buffer size is reached.""" + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _MAX_BUFFER_SIZE_BYTES, + ) + + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.persisted_size = 0 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + writer.simple_flush = mock.AsyncMock() + + data = b"a" * _MAX_BUFFER_SIZE_BYTES + await writer.append(data) + + writer.simple_flush.assert_awaited_once() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_handles_large_data(mock_write_object_stream, mock_client): + """Test that append handles data larger than the buffer size.""" + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _MAX_BUFFER_SIZE_BYTES, + ) + + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.persisted_size = 0 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + writer.simple_flush = mock.AsyncMock() + + data = b"a" * (_MAX_BUFFER_SIZE_BYTES * 2 + 1) + await writer.append(data) + + assert writer.simple_flush.await_count == 2 + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_data_two_times(mock_write_object_stream, mock_client): + """Test that append sends data correctly when called multiple times.""" + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _MAX_CHUNK_SIZE_BYTES, + ) + + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.persisted_size = 0 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + writer.simple_flush = mock.AsyncMock() + + data1 = b"a" * (_MAX_CHUNK_SIZE_BYTES + 10) + await writer.append(data1) + + data2 = b"b" * (_MAX_CHUNK_SIZE_BYTES + 20) + await writer.append(data2) + + total_data_length = len(data1) + len(data2) + assert writer.offset == total_data_length + assert writer.simple_flush.await_count == 0 diff --git a/tests/unit/asyncio/test_async_grpc_client.py b/tests/unit/asyncio/test_async_grpc_client.py index 0e2bf9b50..eb06ab938 100644 --- a/tests/unit/asyncio/test_async_grpc_client.py +++ b/tests/unit/asyncio/test_async_grpc_client.py @@ -15,6 +15,7 @@ import unittest from unittest import mock from google.auth import credentials as auth_credentials +from google.auth.credentials import AnonymousCredentials def _make_credentials(spec=None): @@ -38,12 +39,10 @@ def test_constructor_default_options(self, mock_async_storage_client): "grpc_asyncio" ) mock_transport_cls.create_channel.assert_called_once_with( - attempt_direct_path=True + attempt_direct_path=True, credentials=mock_creds ) mock_channel = mock_transport_cls.create_channel.return_value - mock_transport_cls.assert_called_once_with( - credentials=mock_creds, channel=mock_channel - ) + mock_transport_cls.assert_called_once_with(channel=mock_channel) mock_transport = mock_transport_cls.return_value mock_async_storage_client.assert_called_once_with( transport=mock_transport, @@ -64,21 +63,77 @@ def test_constructor_disables_directpath(self, mock_async_storage_client): ) mock_transport_cls.create_channel.assert_called_once_with( - attempt_direct_path=False + attempt_direct_path=False, credentials=mock_creds ) mock_channel = mock_transport_cls.create_channel.return_value - mock_transport_cls.assert_called_once_with( - credentials=mock_creds, channel=mock_channel - ) + mock_transport_cls.assert_called_once_with(channel=mock_channel) @mock.patch("google.cloud._storage_v2.StorageAsyncClient") - def test_grpc_client_property(self, mock_async_storage_client): + def test_grpc_client_property(self, mock_grpc_gapic_client): from google.cloud.storage._experimental.asyncio import async_grpc_client + # Arrange + mock_transport_cls = mock.MagicMock() + mock_grpc_gapic_client.get_transport_class.return_value = mock_transport_cls + channel_sentinel = mock.sentinel.channel + + mock_transport_cls.create_channel.return_value = channel_sentinel + mock_transport_cls.return_value = mock.sentinel.transport + mock_creds = _make_credentials() + mock_client_info = mock.sentinel.client_info + mock_client_options = mock.sentinel.client_options + mock_attempt_direct_path = mock.sentinel.attempt_direct_path + + # Act + client = async_grpc_client.AsyncGrpcClient( + credentials=mock_creds, + client_info=mock_client_info, + client_options=mock_client_options, + attempt_direct_path=mock_attempt_direct_path, + ) + + mock_grpc_gapic_client.get_transport_class.return_value = mock_transport_cls + + mock_transport_cls.create_channel.return_value = channel_sentinel + mock_transport_instance = mock.sentinel.transport + mock_transport_cls.return_value = mock_transport_instance + + retrieved_client = client.grpc_client + + # Assert + mock_transport_cls.create_channel.assert_called_once_with( + attempt_direct_path=mock_attempt_direct_path, credentials=mock_creds + ) + mock_transport_cls.assere_with(channel=channel_sentinel) + mock_grpc_gapic_client.assert_called_once_with( + transport=mock_transport_instance, + client_info=mock_client_info, + client_options=mock_client_options, + ) + self.assertIs(retrieved_client, mock_grpc_gapic_client.return_value) - client = async_grpc_client.AsyncGrpcClient(credentials=mock_creds) + @mock.patch("google.cloud._storage_v2.StorageAsyncClient") + def test_grpc_client_with_anon_creds(self, mock_grpc_gapic_client): + from google.cloud.storage._experimental.asyncio import async_grpc_client + # Arrange + mock_transport_cls = mock.MagicMock() + mock_grpc_gapic_client.get_transport_class.return_value = mock_transport_cls + channel_sentinel = mock.sentinel.channel + + mock_transport_cls.create_channel.return_value = channel_sentinel + mock_transport_cls.return_value = mock.sentinel.transport + + # Act + anonymous_creds = AnonymousCredentials() + client = async_grpc_client.AsyncGrpcClient(credentials=anonymous_creds) retrieved_client = client.grpc_client - self.assertIs(retrieved_client, mock_async_storage_client.return_value) + # Assert + self.assertIs(retrieved_client, mock_grpc_gapic_client.return_value) + + mock_transport_cls.create_channel.assert_called_once_with( + attempt_direct_path=True, credentials=anonymous_creds + ) + mock_transport_cls.assert_called_once_with(channel=channel_sentinel) diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index 9834b79c9..7fa2123c5 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -15,6 +15,7 @@ import pytest from unittest import mock +from unittest.mock import AsyncMock from google.cloud.storage._experimental.asyncio.async_write_object_stream import ( _AsyncWriteObjectStream, ) @@ -22,6 +23,8 @@ BUCKET = "my-bucket" OBJECT = "my-object" +GENERATION = 12345 +WRITE_HANDLE = b"test-handle" @pytest.fixture @@ -41,6 +44,28 @@ def mock_client(): return client +async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, open=True): + """Helper to create an instance of _AsyncWriteObjectStream and open it by default.""" + socket_like_rpc = AsyncMock() + mock_cls_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = AsyncMock() + socket_like_rpc.send = AsyncMock() + socket_like_rpc.close = AsyncMock() + + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) + mock_response.resource.generation = GENERATION + mock_response.write_handle = WRITE_HANDLE + socket_like_rpc.recv = AsyncMock(return_value=mock_response) + + write_obj_stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + + if open: + await write_obj_stream.open() + + return write_obj_stream + + def test_async_write_object_stream_init(mock_client): """Test the constructor of _AsyncWriteObjectStream.""" stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) @@ -91,18 +116,275 @@ def test_async_write_object_stream_init_raises_value_error(): @pytest.mark.asyncio -async def test_unimplemented_methods_raise_error(mock_client): - """Test that unimplemented methods raise NotImplementedError.""" +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_for_new_object(mock_async_bidi_rpc, mock_client): + """Test opening a stream for a new object.""" + # Arrange + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = mock.AsyncMock() + + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) + mock_response.resource.generation = GENERATION + mock_response.write_handle = WRITE_HANDLE + socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) + + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + + # Act + await stream.open() + + # Assert + assert stream._is_stream_open + socket_like_rpc.open.assert_called_once() + socket_like_rpc.recv.assert_called_once() + assert stream.generation_number == GENERATION + assert stream.write_handle == WRITE_HANDLE + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client): + """Test opening a stream for an existing object.""" + # Arrange + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = mock.AsyncMock() + + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) + mock_response.resource.generation = GENERATION + mock_response.write_handle = WRITE_HANDLE + socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) + + stream = _AsyncWriteObjectStream( + mock_client, BUCKET, OBJECT, generation_number=GENERATION + ) + + # Act + await stream.open() + + # Assert + assert stream._is_stream_open + socket_like_rpc.open.assert_called_once() + socket_like_rpc.recv.assert_called_once() + assert stream.generation_number == GENERATION + assert stream.write_handle == WRITE_HANDLE + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_when_already_open_raises_error(mock_async_bidi_rpc, mock_client): + """Test that opening an already open stream raises a ValueError.""" + # Arrange + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.open = mock.AsyncMock() + + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) + mock_response.resource.generation = GENERATION + mock_response.write_handle = WRITE_HANDLE + socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) + + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + await stream.open() + + # Act & Assert + with pytest.raises(ValueError, match="Stream is already open"): + await stream.open() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_raises_error_on_missing_object_resource( + mock_async_bidi_rpc, mock_client +): + """Test that open raises ValueError if object_resource is not in the response.""" + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + + mock_reponse = mock.AsyncMock() + type(mock_reponse).resource = mock.PropertyMock(return_value=None) + socket_like_rpc.recv.return_value = mock_reponse + + # Note: Don't use below code as unittest library automatically assigns an + # `AsyncMock` object to an attribute, if not set. + # socket_like_rpc.recv.return_value = mock.AsyncMock( + # return_value=_storage_v2.BidiWriteObjectResponse(resource=None) + # ) + + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Failed to obtain object resource after opening the stream" + ): + await stream.open() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_raises_error_on_missing_generation( + mock_async_bidi_rpc, mock_client +): + """Test that open raises ValueError if generation is not in the response.""" + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + + # Configure the mock response object + mock_response = mock.AsyncMock() + type(mock_response.resource).generation = mock.PropertyMock(return_value=None) + socket_like_rpc.recv.return_value = mock_response + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Failed to obtain object generation after opening the stream" + ): + await stream.open() + - with pytest.raises(NotImplementedError): +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_raises_error_on_missing_write_handle( + mock_async_bidi_rpc, mock_client +): + """Test that open raises ValueError if write_handle is not in the response.""" + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + socket_like_rpc.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse( + resource=_storage_v2.Object(generation=GENERATION), write_handle=None + ) + ) + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + with pytest.raises(ValueError, match="Failed to obtain write_handle"): await stream.open() - with pytest.raises(NotImplementedError): - await stream.close() - with pytest.raises(NotImplementedError): - await stream.send(_storage_v2.BidiWriteObjectRequest()) +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_close(mock_cls_async_bidi_rpc, mock_client): + """Test that close successfully closes the stream.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=True + ) + + # Act + await write_obj_stream.close() + + # Assert + write_obj_stream.socket_like_rpc.close.assert_called_once() + assert not write_obj_stream.is_stream_open + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_close_without_open_should_raise_error( + mock_cls_async_bidi_rpc, mock_client +): + """Test that closing a stream that is not open raises a ValueError.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=False + ) + + # Act & Assert + with pytest.raises(ValueError, match="Stream is not open"): + await write_obj_stream.close() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_send(mock_cls_async_bidi_rpc, mock_client): + """Test that send calls the underlying rpc's send method.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=True + ) + + # Act + bidi_write_object_request = _storage_v2.BidiWriteObjectRequest() + await write_obj_stream.send(bidi_write_object_request) + + # Assert + write_obj_stream.socket_like_rpc.send.assert_called_once_with( + bidi_write_object_request + ) + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_send_without_open_should_raise_error( + mock_cls_async_bidi_rpc, mock_client +): + """Test that sending on a stream that is not open raises a ValueError.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=False + ) + + # Act & Assert + with pytest.raises(ValueError, match="Stream is not open"): + await write_obj_stream.send(_storage_v2.BidiWriteObjectRequest()) + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_recv(mock_cls_async_bidi_rpc, mock_client): + """Test that recv calls the underlying rpc's recv method.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=True + ) + bidi_write_object_response = _storage_v2.BidiWriteObjectResponse() + write_obj_stream.socket_like_rpc.recv = AsyncMock( + return_value=bidi_write_object_response + ) + + # Act + response = await write_obj_stream.recv() + + # Assert + write_obj_stream.socket_like_rpc.recv.assert_called_once() + assert response == bidi_write_object_response + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_recv_without_open_should_raise_error( + mock_cls_async_bidi_rpc, mock_client +): + """Test that receiving on a stream that is not open raises a ValueError.""" + # Arrange + write_obj_stream = await instantiate_write_obj_stream( + mock_client, mock_cls_async_bidi_rpc, open=False + ) - with pytest.raises(NotImplementedError): - await stream.recv() + # Act & Assert + with pytest.raises(ValueError, match="Stream is not open"): + await write_obj_stream.recv() diff --git a/tests/unit/gapic/storage_v2/test_storage.py b/tests/unit/gapic/storage_v2/test_storage.py index 20b680341..7b6340aa7 100644 --- a/tests/unit/gapic/storage_v2/test_storage.py +++ b/tests/unit/gapic/storage_v2/test_storage.py @@ -148,12 +148,19 @@ def test__read_environment_variables(): with mock.patch.dict( os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "Unsupported"} ): - with pytest.raises(ValueError) as excinfo: - StorageClient._read_environment_variables() - assert ( - str(excinfo.value) - == "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" - ) + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with pytest.raises(ValueError) as excinfo: + StorageClient._read_environment_variables() + assert ( + str(excinfo.value) + == "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" + ) + else: + assert StorageClient._read_environment_variables() == ( + False, + "auto", + None, + ) with mock.patch.dict(os.environ, {"GOOGLE_API_USE_MTLS_ENDPOINT": "never"}): assert StorageClient._read_environment_variables() == (False, "never", None) @@ -176,6 +183,105 @@ def test__read_environment_variables(): assert StorageClient._read_environment_variables() == (False, "auto", "foo.com") +def test_use_client_cert_effective(): + # Test case 1: Test when `should_use_client_cert` returns True. + # We mock the `should_use_client_cert` function to simulate a scenario where + # the google-auth library supports automatic mTLS and determines that a + # client certificate should be used. + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch( + "google.auth.transport.mtls.should_use_client_cert", return_value=True + ): + assert StorageClient._use_client_cert_effective() is True + + # Test case 2: Test when `should_use_client_cert` returns False. + # We mock the `should_use_client_cert` function to simulate a scenario where + # the google-auth library supports automatic mTLS and determines that a + # client certificate should NOT be used. + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch( + "google.auth.transport.mtls.should_use_client_cert", return_value=False + ): + assert StorageClient._use_client_cert_effective() is False + + # Test case 3: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to "true". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict(os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "true"}): + assert StorageClient._use_client_cert_effective() is True + + # Test case 4: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to "false". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict( + os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "false"} + ): + assert StorageClient._use_client_cert_effective() is False + + # Test case 5: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to "True". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict(os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "True"}): + assert StorageClient._use_client_cert_effective() is True + + # Test case 6: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to "False". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict( + os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "False"} + ): + assert StorageClient._use_client_cert_effective() is False + + # Test case 7: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to "TRUE". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict(os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "TRUE"}): + assert StorageClient._use_client_cert_effective() is True + + # Test case 8: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to "FALSE". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict( + os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "FALSE"} + ): + assert StorageClient._use_client_cert_effective() is False + + # Test case 9: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is not set. + # In this case, the method should return False, which is the default value. + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict(os.environ, clear=True): + assert StorageClient._use_client_cert_effective() is False + + # Test case 10: Test when `should_use_client_cert` is unavailable and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to an invalid value. + # The method should raise a ValueError as the environment variable must be either + # "true" or "false". + if not hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict( + os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "unsupported"} + ): + with pytest.raises(ValueError): + StorageClient._use_client_cert_effective() + + # Test case 11: Test when `should_use_client_cert` is available and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is set to an invalid value. + # The method should return False as the environment variable is set to an invalid value. + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict( + os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "unsupported"} + ): + assert StorageClient._use_client_cert_effective() is False + + # Test case 12: Test when `should_use_client_cert` is available and the + # `GOOGLE_API_USE_CLIENT_CERTIFICATE` environment variable is unset. Also, + # the GOOGLE_API_CONFIG environment variable is unset. + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + with mock.patch.dict(os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": ""}): + with mock.patch.dict(os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": ""}): + assert StorageClient._use_client_cert_effective() is False + + def test__get_client_cert_source(): mock_provided_cert_source = mock.Mock() mock_default_cert_source = mock.Mock() @@ -515,17 +621,6 @@ def test_storage_client_client_options(client_class, transport_class, transport_ == "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" ) - # Check the case GOOGLE_API_USE_CLIENT_CERTIFICATE has unsupported value. - with mock.patch.dict( - os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "Unsupported"} - ): - with pytest.raises(ValueError) as excinfo: - client = client_class(transport=transport_name) - assert ( - str(excinfo.value) - == "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" - ) - # Check the case quota_project_id is provided options = client_options.ClientOptions(quota_project_id="octopus") with mock.patch.object(transport_class, "__init__") as patched: @@ -733,6 +828,119 @@ def test_storage_client_get_mtls_endpoint_and_cert_source(client_class): assert api_endpoint == mock_api_endpoint assert cert_source is None + # Test the case GOOGLE_API_USE_CLIENT_CERTIFICATE is "Unsupported". + with mock.patch.dict( + os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "Unsupported"} + ): + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + mock_client_cert_source = mock.Mock() + mock_api_endpoint = "foo" + options = client_options.ClientOptions( + client_cert_source=mock_client_cert_source, + api_endpoint=mock_api_endpoint, + ) + api_endpoint, cert_source = client_class.get_mtls_endpoint_and_cert_source( + options + ) + assert api_endpoint == mock_api_endpoint + assert cert_source is None + + # Test cases for mTLS enablement when GOOGLE_API_USE_CLIENT_CERTIFICATE is unset. + test_cases = [ + ( + # With workloads present in config, mTLS is enabled. + { + "version": 1, + "cert_configs": { + "workload": { + "cert_path": "path/to/cert/file", + "key_path": "path/to/key/file", + } + }, + }, + mock_client_cert_source, + ), + ( + # With workloads not present in config, mTLS is disabled. + { + "version": 1, + "cert_configs": {}, + }, + None, + ), + ] + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + for config_data, expected_cert_source in test_cases: + env = os.environ.copy() + env.pop("GOOGLE_API_USE_CLIENT_CERTIFICATE", None) + with mock.patch.dict(os.environ, env, clear=True): + config_filename = "mock_certificate_config.json" + config_file_content = json.dumps(config_data) + m = mock.mock_open(read_data=config_file_content) + with mock.patch("builtins.open", m): + with mock.patch.dict( + os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": config_filename} + ): + mock_api_endpoint = "foo" + options = client_options.ClientOptions( + client_cert_source=mock_client_cert_source, + api_endpoint=mock_api_endpoint, + ) + ( + api_endpoint, + cert_source, + ) = client_class.get_mtls_endpoint_and_cert_source(options) + assert api_endpoint == mock_api_endpoint + assert cert_source is expected_cert_source + + # Test cases for mTLS enablement when GOOGLE_API_USE_CLIENT_CERTIFICATE is unset(empty). + test_cases = [ + ( + # With workloads present in config, mTLS is enabled. + { + "version": 1, + "cert_configs": { + "workload": { + "cert_path": "path/to/cert/file", + "key_path": "path/to/key/file", + } + }, + }, + mock_client_cert_source, + ), + ( + # With workloads not present in config, mTLS is disabled. + { + "version": 1, + "cert_configs": {}, + }, + None, + ), + ] + if hasattr(google.auth.transport.mtls, "should_use_client_cert"): + for config_data, expected_cert_source in test_cases: + env = os.environ.copy() + env.pop("GOOGLE_API_USE_CLIENT_CERTIFICATE", "") + with mock.patch.dict(os.environ, env, clear=True): + config_filename = "mock_certificate_config.json" + config_file_content = json.dumps(config_data) + m = mock.mock_open(read_data=config_file_content) + with mock.patch("builtins.open", m): + with mock.patch.dict( + os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": config_filename} + ): + mock_api_endpoint = "foo" + options = client_options.ClientOptions( + client_cert_source=mock_client_cert_source, + api_endpoint=mock_api_endpoint, + ) + ( + api_endpoint, + cert_source, + ) = client_class.get_mtls_endpoint_and_cert_source(options) + assert api_endpoint == mock_api_endpoint + assert cert_source is expected_cert_source + # Test the case GOOGLE_API_USE_MTLS_ENDPOINT is "never". with mock.patch.dict(os.environ, {"GOOGLE_API_USE_MTLS_ENDPOINT": "never"}): api_endpoint, cert_source = client_class.get_mtls_endpoint_and_cert_source() @@ -783,18 +991,6 @@ def test_storage_client_get_mtls_endpoint_and_cert_source(client_class): == "Environment variable `GOOGLE_API_USE_MTLS_ENDPOINT` must be `never`, `auto` or `always`" ) - # Check the case GOOGLE_API_USE_CLIENT_CERTIFICATE has unsupported value. - with mock.patch.dict( - os.environ, {"GOOGLE_API_USE_CLIENT_CERTIFICATE": "Unsupported"} - ): - with pytest.raises(ValueError) as excinfo: - client_class.get_mtls_endpoint_and_cert_source() - - assert ( - str(excinfo.value) - == "Environment variable `GOOGLE_API_USE_CLIENT_CERTIFICATE` must be either `true` or `false`" - ) - @pytest.mark.parametrize("client_class", [StorageClient, StorageAsyncClient]) @mock.patch.object( diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index f3b6da5d1..cbf53b398 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -3049,7 +3049,14 @@ def test__initiate_resumable_upload_with_client_custom_headers(self): self._initiate_resumable_helper(client=client) def _make_resumable_transport( - self, headers1, headers2, headers3, total_bytes, data_corruption=False + self, + headers1, + headers2, + headers3, + total_bytes, + data_corruption=False, + md5_checksum_value=None, + crc32c_checksum_value=None, ): fake_transport = mock.Mock(spec=["request"]) @@ -3057,7 +3064,7 @@ def _make_resumable_transport( fake_response2 = self._mock_requests_response( http.client.PERMANENT_REDIRECT, headers2 ) - json_body = f'{{"size": "{total_bytes:d}"}}' + json_body = json.dumps({"size": str(total_bytes), "md5Hash": md5_checksum_value, "crc32c": crc32c_checksum_value}) if data_corruption: fake_response3 = DataCorruption(None) else: @@ -3151,6 +3158,9 @@ def _do_resumable_upload_call2( if_metageneration_match=None, if_metageneration_not_match=None, timeout=None, + checksum=None, + crc32c_checksum_value=None, + md5_checksum_value=None, ): # Third mock transport.request() does sends last chunk. content_range = f"bytes {blob.chunk_size:d}-{total_bytes - 1:d}/{total_bytes:d}" @@ -3161,6 +3171,11 @@ def _do_resumable_upload_call2( "content-type": content_type, "content-range": content_range, } + if checksum == "crc32c": + expected_headers["x-goog-hash"] = f"crc32c={crc32c_checksum_value}" + elif checksum == "md5": + expected_headers["x-goog-hash"] = f"md5={md5_checksum_value}" + payload = data[blob.chunk_size :] return mock.call( "PUT", @@ -3181,12 +3196,17 @@ def _do_resumable_helper( timeout=None, data_corruption=False, retry=None, + checksum=None, # None is also a valid value, when user decides to disable checksum validation. ): CHUNK_SIZE = 256 * 1024 USER_AGENT = "testing 1.2.3" content_type = "text/html" # Data to be uploaded. data = b"" + (b"A" * CHUNK_SIZE) + b"" + + # Data calcuated offline and entered here. (Unit test best practice). + crc32c_checksum_value = "mQ30hg==" + md5_checksum_value = "wajHeg1f2Q2u9afI6fjPOw==" total_bytes = len(data) if use_size: size = total_bytes @@ -3213,6 +3233,8 @@ def _do_resumable_helper( headers3, total_bytes, data_corruption=data_corruption, + md5_checksum_value=md5_checksum_value, + crc32c_checksum_value=crc32c_checksum_value, ) # Create some mock arguments and call the method under test. @@ -3247,7 +3269,7 @@ def _do_resumable_helper( if_generation_not_match, if_metageneration_match, if_metageneration_not_match, - checksum=None, + checksum=checksum, retry=retry, **timeout_kwarg, ) @@ -3296,6 +3318,9 @@ def _do_resumable_helper( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=expected_timeout, + checksum=checksum, + crc32c_checksum_value=crc32c_checksum_value, + md5_checksum_value=md5_checksum_value, ) self.assertEqual(transport.request.mock_calls, [call0, call1, call2]) @@ -3308,6 +3333,12 @@ def test__do_resumable_upload_no_size(self): def test__do_resumable_upload_with_size(self): self._do_resumable_helper(use_size=True) + def test__do_resumable_upload_with_size_with_crc32c_checksum(self): + self._do_resumable_helper(use_size=True, checksum="crc32c") + + def test__do_resumable_upload_with_size_with_md5_checksum(self): + self._do_resumable_helper(use_size=True, checksum="md5") + def test__do_resumable_upload_with_retry(self): self._do_resumable_helper(retry=DEFAULT_RETRY) diff --git a/tests/unit/test_bucket.py b/tests/unit/test_bucket.py index 809b572e0..850e89d04 100644 --- a/tests/unit/test_bucket.py +++ b/tests/unit/test_bucket.py @@ -18,6 +18,7 @@ import mock import pytest +from google.cloud.storage.blob import _quote from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED @@ -2320,6 +2321,37 @@ def test_move_blob_w_no_retry_timeout_and_generation_match(self): _target_object=new_blob, ) + def test_move_blob_needs_url_encoding(self): + source_name = "source" + blob_name = "blob-name" + new_name = "new/name" + api_response = {} + client = mock.Mock(spec=["_post_resource"]) + client._post_resource.return_value = api_response + source = self._make_one(client=client, name=source_name) + blob = self._make_blob(source_name, blob_name) + + new_blob = source.move_blob( + blob, new_name, if_generation_match=0, retry=None, timeout=30 + ) + + self.assertIs(new_blob.bucket, source) + self.assertEqual(new_blob.name, new_name) + + expected_path = "/b/{}/o/{}/moveTo/o/{}".format( + source_name, blob_name, _quote(new_name) + ) + expected_data = None + expected_query_params = {"ifGenerationMatch": 0} + client._post_resource.assert_called_once_with( + expected_path, + expected_data, + query_params=expected_query_params, + timeout=30, + retry=None, + _target_object=new_blob, + ) + def test_move_blob_w_user_project(self): source_name = "source" blob_name = "blob-name"