File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/storage/wildcard_iterator.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for expanding wildcarded GCS pathnames."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import abc
import collections
import fnmatch
import heapq
import os
import pathlib
import re
from typing import Iterator
from googlecloudsdk.api_lib.storage import api_factory
from googlecloudsdk.api_lib.storage import cloud_api
from googlecloudsdk.api_lib.storage import errors as api_errors
from googlecloudsdk.api_lib.storage import request_config_factory
from googlecloudsdk.command_lib.storage import errors as command_errors
from googlecloudsdk.command_lib.storage import folder_util
from googlecloudsdk.command_lib.storage import storage_url
from googlecloudsdk.command_lib.storage.resources import resource_reference
from googlecloudsdk.core import log
from googlecloudsdk.core.util import debug_output
import six
_FILES_ONLY_ERROR_FORMAT = 'Expected files but got stream: {}'
COMPRESS_WILDCARDS_REGEX = re.compile(r'\*{3,}')
WILDCARD_REGEX = re.compile(r'[*?\[\]]')
_RELATIVE_PATH_SYMBOLS = frozenset(['.', '.' + os.sep, '..', '..' + os.sep])
def _is_hidden(path):
return path.rpartition(os.sep)[2].startswith('.')
def contains_wildcard(url_string):
"""Checks whether url_string contains a wildcard.
Args:
url_string: URL string to check.
Returns:
bool indicator.
"""
return bool(WILDCARD_REGEX.search(url_string))
def get_wildcard_iterator(
url_str,
error_on_missing_key=True,
exclude_patterns=None,
fetch_encrypted_object_hashes=False,
fields_scope=cloud_api.FieldsScope.NO_ACL,
files_only=False,
force_include_hidden_files=False,
get_bucket_metadata=False,
halt_on_empty_response=True,
ignore_symlinks=False,
managed_folder_setting=folder_util.ManagedFolderSetting.DO_NOT_LIST,
folder_setting=folder_util.FolderSetting.DO_NOT_LIST,
next_page_token=None,
object_state=cloud_api.ObjectState.LIVE,
preserve_symlinks=False,
raise_managed_folder_precondition_errors=False,
soft_deleted_buckets=False,
list_filter=None,
):
"""Instantiate a WildcardIterator for the given URL string.
Args:
url_str (str): URL string which may contain wildcard characters.
error_on_missing_key (bool): If true, and the encryption key needed to
decrypt an object is missing, the iterator raises an error for that
object.
exclude_patterns (Patterns|None): Don't return resources whose URLs or local
file paths matched these regex patterns.
fetch_encrypted_object_hashes (bool): Fall back to GET requests for
encrypted cloud objects in order to fetch their hash values.
fields_scope (cloud_api.FieldsScope): Determines amount of metadata returned
by API.
files_only (bool): Skips containers. Raises error for stream types. Still
returns symlinks.
force_include_hidden_files (bool): Include local hidden files even if not
recursive iteration. URL should be for directory or directory followed by
wildcards.
get_bucket_metadata (bool): If true, perform a bucket GET request when
fetching bucket resources.
halt_on_empty_response (bool): Stops querying after empty list response. See
CloudApi for details.
ignore_symlinks (bool): Skip over symlinks instead of following them.
managed_folder_setting (folder_util.ManagedFolderSetting): Indicates how to
deal with managed folders.
folder_setting (folder_util.FolderSetting): Indicates how to deal with
folders.
next_page_token (str|None): Used to resume LIST calls.
object_state (cloud_api.ObjectState): Versions of objects to query.
preserve_symlinks (bool): Preserve symlinks instead of following them.
raise_managed_folder_precondition_errors (bool): If True, raises
precondition errors from managed folder listing. Otherwise, suppresses
these errors. This is helpful in commands that list managed folders by
default.
soft_deleted_buckets (bool): If true, soft deleted buckets will be queried.
list_filter (str|None): If provided, objects with matching filters
will be returned, The prefixes would still be returned regardless of
whether they match the specified filter, See
go/gcs-object-context-filtering for more details.
Returns:
A WildcardIterator object.
"""
url = storage_url.storage_url_from_string(url_str)
if isinstance(url, storage_url.CloudUrl):
return CloudWildcardIterator(
url,
error_on_missing_key=error_on_missing_key,
exclude_patterns=exclude_patterns,
fetch_encrypted_object_hashes=fetch_encrypted_object_hashes,
fields_scope=fields_scope,
files_only=files_only,
get_bucket_metadata=get_bucket_metadata,
halt_on_empty_response=halt_on_empty_response,
managed_folder_setting=managed_folder_setting,
folder_setting=folder_setting,
next_page_token=next_page_token,
object_state=object_state,
raise_managed_folder_precondition_errors=raise_managed_folder_precondition_errors,
soft_deleted_buckets=soft_deleted_buckets,
list_filter=list_filter,
)
elif isinstance(url, storage_url.FileUrl):
return FileWildcardIterator(
url,
exclude_patterns=exclude_patterns,
files_only=files_only,
force_include_hidden_files=force_include_hidden_files,
ignore_symlinks=ignore_symlinks,
preserve_symlinks=preserve_symlinks,
)
else:
raise command_errors.InvalidUrlError('Unknown url type %s.' % url)
def _compress_url_wildcards(url):
"""Asterisk counts greater than two treated as single * to mimic globs.
Args:
url (StorageUrl): Url to compress wildcards in.
Returns:
StorageUrl built from string with compressed wildcards.
"""
compressed_url_string = re.sub(COMPRESS_WILDCARDS_REGEX, '*',
url.versionless_url_string)
if url.generation is not None:
compressed_url_string += '#' + url.generation
return storage_url.storage_url_from_string(compressed_url_string)
class WildcardIterator(six.with_metaclass(abc.ABCMeta)):
"""Class for iterating over Google Cloud Storage strings containing wildcards.
The base class is abstract; you should instantiate using the
wildcard_iterator() static factory method, which chooses the right
implementation depending on the base string.
"""
def __init__(
self,
url,
exclude_patterns=None,
files_only=False,
):
"""Initializes class. See get_wildcard_iterator for Args docstring."""
self._url = _compress_url_wildcards(url)
self._exclude_patterns = exclude_patterns
self._files_only = files_only
def __repr__(self):
"""Returns string representation of WildcardIterator."""
return 'WildcardIterator(%s)' % getattr(self._url, 'url_string', None)
class FileWildcardIterator(WildcardIterator):
"""Class to iterate over files and directories."""
def __init__(
self,
url,
exclude_patterns=None,
files_only=False,
force_include_hidden_files=False,
ignore_symlinks=False,
preserve_symlinks=False,
):
"""Initialize FileWildcardIterator instance.
Args:
url (FileUrl): A FileUrl instance representing a file path.
exclude_patterns (Patterns|None): See get_wildcard_iterator.
files_only (bool): Returns files and symlinks, skips folders, errors on
streams.
force_include_hidden_files (bool): Include hidden files even if not
recursive iteration. URL should be for directory or directory followed
by wildcards.
ignore_symlinks (bool): Skip over symlinks instead of following them.
preserve_symlinks (bool): Preserve symlinks instead of following them.
"""
super(FileWildcardIterator, self).__init__(
url,
exclude_patterns,
files_only=files_only,
)
self._ignore_symlinks = ignore_symlinks
self._preserve_symlinks = preserve_symlinks
if (
force_include_hidden_files
and url.resource_name.rstrip('*')[-1] != os.sep
):
raise command_errors.InvalidUrlError(
'If force-including hidden files, input URL must be directory or'
' directory followed by wildcards.'
)
self._path = self._url.resource_name
self._recurse = '**' in self._path
self._include_hidden_files = (
self._recurse or force_include_hidden_files or _is_hidden(self._path)
)
def __iter__(self):
# Files named '-' will not be copied, as that string makes is_stdio true.
if self._url.is_stdio:
if self._files_only:
raise command_errors.InvalidUrlError(
_FILES_ONLY_ERROR_FORMAT.format(self._url.resource_name)
)
yield resource_reference.FileObjectResource(self._url)
return
if self._path in _RELATIVE_PATH_SYMBOLS:
# Otherwise copies involving relative paths raise pathlib errors:
# b/289221450.
yield resource_reference.FileDirectoryResource(self._url)
return
pathlib_path = pathlib.Path(self._path).expanduser()
if pathlib_path.root:
# It's a path that starts with a root. Create the glob pattern relative
# to the root dir. Ex: /usr/a/b/c => (usr, a, b, c)
path_components_relative_to_root = list(pathlib_path.parts[1:])
path_relative_to_root = os.path.join(*path_components_relative_to_root)
root = pathlib_path.anchor
else:
root = '.'
path_relative_to_root = self._path
if path_relative_to_root.endswith('**'):
path_relative_to_root = os.path.join(path_relative_to_root, '*')
# Pathlib removes leading './' from paths, but the upload path completion
# assumes that the expanded path contains './' if the user's path does.
current_working_directory_prefix = '.' + os.sep
if self._path.startswith(current_working_directory_prefix):
path_prefix = current_working_directory_prefix
else:
path_prefix = ''
path_iterator = (
path_prefix + str(p)
for p in pathlib.Path(root).glob(path_relative_to_root)
)
for path in path_iterator:
if (self._exclude_patterns and self._exclude_patterns.match(path)) or (
not self._include_hidden_files and _is_hidden(path)
):
continue
if self._files_only and not os.path.isfile(path):
if storage_url.is_named_pipe(path):
raise command_errors.InvalidUrlError(
_FILES_ONLY_ERROR_FORMAT.format(self._url.resource_name)
)
continue
# Follow symlinks unless pointing to directory or exclude flag is present.
# However, include even directory symlinks (as files) when symlinks are
# being preserved.
is_symlink = os.path.islink(path)
if (
is_symlink
and not self._preserve_symlinks
and (os.path.isdir(path) or self._ignore_symlinks)
):
log.warning('Skipping symlink {}'.format(path))
continue
# For pattern like foo/bar/**, glob returns first path as 'foo/bar/'
# even when foo/bar does not exist. So we skip non-existing paths.
# Glob also returns intermediate directories if called with **. We skip
# them to be consistent with CloudWildcardIterator. Preserved directory
# symlinks, however, should not be skipped.
if (
self._path.endswith('**')
and not (is_symlink and self._preserve_symlinks)
and (not os.path.exists(path) or os.path.isdir(path))
):
continue
file_url = storage_url.FileUrl(path)
if not is_symlink and os.path.isdir(path):
yield resource_reference.FileDirectoryResource(file_url)
elif is_symlink and self._preserve_symlinks:
yield resource_reference.FileSymlinkPlaceholderResource(file_url)
else:
yield resource_reference.FileObjectResource(
file_url, is_symlink=is_symlink
)
class CloudWildcardIterator(WildcardIterator):
"""Class to iterate over Cloud Storage strings containing wildcards."""
def __init__(
self,
url,
error_on_missing_key=True,
exclude_patterns=None,
fetch_encrypted_object_hashes=False,
fields_scope=cloud_api.FieldsScope.NO_ACL,
files_only=False,
get_bucket_metadata=False,
halt_on_empty_response=True,
managed_folder_setting=folder_util.ManagedFolderSetting.DO_NOT_LIST,
folder_setting=folder_util.FolderSetting.DO_NOT_LIST,
next_page_token=None,
object_state=cloud_api.ObjectState.LIVE,
raise_managed_folder_precondition_errors=True,
soft_deleted_buckets=False,
list_filter=None,
):
"""Instantiates an iterator that matches the wildcard URL.
Args:
url (CloudUrl): CloudUrl that may contain wildcard that needs expansion.
error_on_missing_key (bool): If true, and the encryption key needed to
decrypt an object is missing, the iterator raises an error for that
object.
exclude_patterns (Patterns|None): See get_wildcard_iterator.
fetch_encrypted_object_hashes (bool): Fall back to GET requests for
encrypted objects in order to fetch their hash values.
fields_scope (cloud_api.FieldsScope): Determines amount of metadata
returned by API.
files_only (bool): Returns cloud objects, not prefixes or buckets. Also
skips directory placeholder objects, although they are technically
objects.
get_bucket_metadata (bool): If true, perform a bucket GET request when
fetching bucket resources. Otherwise, bucket URLs without wildcards may
be returned without verifying the buckets exist.
halt_on_empty_response (bool): Stops querying after empty list response.
See CloudApi for details.
managed_folder_setting (folder_util.ManagedFolderSetting): Indicates how
to deal with managed folders.
folder_setting (folder_util.FolderSetting): Indicates how to deal with
folders.
next_page_token (str|None): Used to resume LIST calls.
object_state (cloud_api.ObjectState): Versions of objects to query.
raise_managed_folder_precondition_errors (bool): If True, raises
precondition errors from managed folder listing. Otherwise, suppresses
these errors. This is helpful in commands that list managed folders by
default.
soft_deleted_buckets (bool): If true, soft deleted buckets will be
queried.
list_filter (str|None): If provided, objects with matching
contexts will be returned. The prefixes would still be returned
regardless of whether they match the specified context, See
go/gcs-object-context-filtering for more details.
"""
super(CloudWildcardIterator, self).__init__(
url, exclude_patterns=exclude_patterns, files_only=files_only
)
self._client = api_factory.get_api(self._url.scheme)
self._error_on_missing_key = error_on_missing_key
self._fetch_encrypted_object_hashes = fetch_encrypted_object_hashes
self._fields_scope = fields_scope
self._get_bucket_metadata = get_bucket_metadata
self._halt_on_empty_response = halt_on_empty_response
self._managed_folder_setting = managed_folder_setting
self._folder_setting = folder_setting
self._next_page_token = next_page_token
self._object_state = object_state
self._raise_managed_folder_precondition_errors = (
raise_managed_folder_precondition_errors
)
self._soft_deleted_buckets = soft_deleted_buckets
self._list_filter = list_filter
if (
object_state is cloud_api.ObjectState.LIVE
and self._url.generation is not None
):
self._object_state_for_listing = cloud_api.ObjectState.LIVE_AND_NONCURRENT
else:
self._object_state_for_listing = object_state
self._soft_deleted = object_state is cloud_api.ObjectState.SOFT_DELETED
self._object_state_requires_expansion = (
self._object_state is cloud_api.ObjectState.LIVE_AND_NONCURRENT
or (self._soft_deleted and self._url.generation is None)
)
def __iter__(self):
if self._files_only and (self._url.is_provider() or self._url.is_bucket()):
return
if self._url.is_provider():
for bucket_resource in self._client.list_buckets(
fields_scope=self._fields_scope,
soft_deleted=self._soft_deleted_buckets,
):
yield bucket_resource
else:
for bucket_or_unknown_resource in self._fetch_buckets():
if self._url.is_bucket():
yield bucket_or_unknown_resource
else: # URL is an object or prefix.
# We have to fetch information metadata of a Bucket again as we
# do not always list the full scope of fields during the above steps.
# We may not want to do that even now, to avoid extra information
# being printed anywhere in the output.
is_hns_bucket = self._is_hns_bucket(
bucket_or_unknown_resource.storage_url.bucket_name
)
for resource in self._fetch_sub_bucket_resources(
bucket_or_unknown_resource.storage_url.bucket_name,
is_hns_bucket=is_hns_bucket,
):
if self._exclude_patterns and self._exclude_patterns.match(
resource.storage_url.versionless_url_string
):
continue
if self._files_only and (
not isinstance(resource, resource_reference.ObjectResource)
or ( # Directory placeholder object.
resource.storage_url.resource_name.endswith(
storage_url.CLOUD_URL_DELIMITER
)
and resource.size == 0
)
):
continue
if (
self._managed_folder_setting
is folder_util.ManagedFolderSetting.LIST_WITHOUT_OBJECTS
and not isinstance(
resource, resource_reference.ManagedFolderResource
)
):
continue
if (
self._folder_setting
is folder_util.FolderSetting.LIST_WITHOUT_OBJECTS
and not isinstance(resource, resource_reference.FolderResource)
):
continue
yield resource
def _decrypt_resource_if_necessary(self, resource):
if (
self._fetch_encrypted_object_hashes
and cloud_api.Capability.ENCRYPTION in self._client.capabilities
and self._fields_scope != cloud_api.FieldsScope.SHORT
and isinstance(resource, resource_reference.ObjectResource)
and not (resource.crc32c_hash or resource.md5_hash)
):
# LIST won't return GCS hash fields. Need to GET.
if resource.kms_key:
# Backend will reject if user does not have KMS encryption permissions.
return self._client.get_object_metadata(
resource.bucket,
resource.name,
generation=self._url.generation,
fields_scope=self._fields_scope,
soft_deleted=self._soft_deleted,
)
if resource.decryption_key_hash_sha256:
request_config = request_config_factory.get_request_config(
resource.storage_url,
decryption_key_hash_sha256=resource.decryption_key_hash_sha256,
error_on_missing_key=self._error_on_missing_key)
if getattr(request_config.resource_args, 'decryption_key', None):
# Don't GET unless we have a key that will decrypt object.
return self._client.get_object_metadata(
resource.bucket,
resource.name,
request_config,
generation=self._url.generation,
fields_scope=self._fields_scope,
soft_deleted=self._soft_deleted,
)
# No decryption necessary or don't have proper key.
return resource
def _try_getting_object_directly(self, bucket_name):
"""Matches user input that doesn't need expansion."""
try:
resource = self._client.get_object_metadata(
bucket_name,
self._url.resource_name,
# TODO(b/197754758): add user request args from surface.
request_config_factory.get_request_config(self._url),
generation=self._url.generation,
fields_scope=self._fields_scope,
soft_deleted=self._soft_deleted,
)
return self._decrypt_resource_if_necessary(resource)
except api_errors.NotFoundError:
# Object does not exist. Could be a prefix.
pass
except api_errors.GcsApiError as e:
# GET with soft-deleted objects requires generation.
if (
e.status_code == 400
and 'You must specify a generation' in str(e)
and self._url.url_string.endswith(self._url.delimiter)
and self._soft_deleted
):
log.debug(
'GET failed with "must specify generation" error. This is'
' expected for a soft-deleted object listed with a trailing'
' slash. Falling back to a LIST call.'
)
pass
else:
raise
return None
def _fetch_sub_bucket_resources(self, bucket_name, is_hns_bucket=False):
"""Fetch all objects for the given bucket that match the URL."""
needs_further_expansion = (
contains_wildcard(self._url.resource_name)
or self._object_state_requires_expansion
or self._url.url_string.endswith(self._url.delimiter)
)
if not needs_further_expansion:
# Assume that the URL represents a single object.
direct_query_result = self._try_getting_object_directly(bucket_name)
if direct_query_result:
return [direct_query_result]
# Will run if direct check found no result.
return self._expand_object_path(bucket_name, is_hns_bucket)
def _get_managed_folder_iterator(self, bucket_name, wildcard_parts):
# Listing all objects under a prefix (recursive listing) occurs when
# `delimiter` is None. `list_managed_folders` does not support delimiters,
# so this is the only circumstance where it's safe to call.
is_recursive_expansion = wildcard_parts.delimiter is None
should_list_managed_folders = self._managed_folder_setting in (
folder_util.ManagedFolderSetting.LIST_WITH_OBJECTS,
folder_util.ManagedFolderSetting.LIST_WITHOUT_OBJECTS,
)
try:
if (
should_list_managed_folders
and cloud_api.Capability.MANAGED_FOLDERS in self._client.capabilities
and is_recursive_expansion
):
managed_folder_iterator = self._client.list_managed_folders(
bucket_name=bucket_name, prefix=wildcard_parts.prefix or None
)
else:
managed_folder_iterator = []
for resource in managed_folder_iterator:
yield resource
except api_errors.PreconditionFailedError:
if self._raise_managed_folder_precondition_errors:
raise
def _get_folder_iterator(
self, bucket_name, wildcard_parts, is_hns_bucket=False
):
is_recursive_expansion = wildcard_parts.delimiter is None
is_list_as_folders = (
self._folder_setting is folder_util.FolderSetting.LIST_AS_FOLDERS
and is_hns_bucket
)
should_list_folders = (
self._folder_setting
in (folder_util.FolderSetting.LIST_WITHOUT_OBJECTS,)
or is_list_as_folders
)
if wildcard_parts.prefix:
modified_prefix = (
wildcard_parts.prefix + '/'
if not wildcard_parts.prefix.endswith('/')
else wildcard_parts.prefix
)
else:
modified_prefix = None
if (
should_list_folders
and cloud_api.Capability.FOLDERS in self._client.capabilities
and is_recursive_expansion
):
folder_iterator = self._client.list_folders(
bucket_name=bucket_name,
prefix=modified_prefix,
)
else:
folder_iterator = []
for resource in folder_iterator:
yield resource
def _get_resource_iterator(
self, bucket_name, wildcard_parts, is_hns_bucket=False
):
if (
(
self._managed_folder_setting
is not folder_util.ManagedFolderSetting.LIST_WITHOUT_OBJECTS
and self._folder_setting
is not folder_util.FolderSetting.LIST_WITHOUT_OBJECTS
)
# Even if we're just listing managed folders/folders, we need to call
# list_objects to expand non-recursive wildcards using delimiters. For
# example, to expand gs://bucket/*/dir/**, we will call list_objects to
# get PrefixResources needed to expand the first wildcard. After all
# wildcards in the prefix are expanded, wildcard_parts.delimiter will be
# None, and we will skip this call.
or wildcard_parts.delimiter
):
# If we are using managed folders at all, we need to include them as
# prefixes so that wildcard expansion works appropriately.
setting_is_do_not_list = (
self._managed_folder_setting
is folder_util.ManagedFolderSetting.DO_NOT_LIST
and self._folder_setting is folder_util.FolderSetting.DO_NOT_LIST
)
# The API raises an error if we attempt to include folders as prefixes
# and do not specify a delimiter.
uses_delimiter = bool(wildcard_parts.delimiter)
include_folders_as_prefixes = (
None if setting_is_do_not_list or not uses_delimiter else True
)
# TODO(b/299973762): Allow the list_objects API method to only yield
# prefixes if we want managed folders without objects.
object_iterator = self._client.list_objects(
bucket_name=bucket_name,
delimiter=wildcard_parts.delimiter,
fields_scope=self._fields_scope,
halt_on_empty_response=self._halt_on_empty_response,
include_folders_as_prefixes=include_folders_as_prefixes,
next_page_token=self._next_page_token,
prefix=wildcard_parts.prefix or None,
object_state=self._object_state_for_listing,
list_filter=self._list_filter,
)
else:
object_iterator = []
managed_folder_iterator = self._get_managed_folder_iterator(
bucket_name, wildcard_parts
)
folder_iterator = self._get_folder_iterator(
bucket_name, wildcard_parts, is_hns_bucket
)
return heapq.merge(
object_iterator,
managed_folder_iterator,
folder_iterator,
key=lambda resource: resource.storage_url.url_string,
)
def _maybe_convert_prefix_to_managed_folder(self, resource):
"""If resource is a prefix, attempts to convert it to a managed folder."""
if (
# pylint: disable=unidiomatic-typecheck
# We do not want this check to pass for child classes.
type(resource) is not resource_reference.PrefixResource
# pylint: enable=unidiomatic-typecheck
or self._managed_folder_setting
not in {
folder_util.ManagedFolderSetting.LIST_WITH_OBJECTS,
folder_util.ManagedFolderSetting.LIST_WITHOUT_OBJECTS,
}
or cloud_api.Capability.MANAGED_FOLDERS not in self._client.capabilities
):
return resource
try:
prefix_url = resource.storage_url
return self._client.get_managed_folder(
prefix_url.bucket_name, prefix_url.resource_name
)
except api_errors.NotFoundError:
return resource
def _maybe_convert_prefix_to_folder(self, resource, is_hns_bucket=False):
"""If resource is a prefix, attempts to convert it to a folder."""
if (
# pylint: disable=unidiomatic-typecheck
# We do not want this check to pass for child classes.
type(resource) is not resource_reference.PrefixResource
# pylint: enable=unidiomatic-typecheck
or self._folder_setting
not in {
folder_util.FolderSetting.LIST_WITHOUT_OBJECTS,
folder_util.FolderSetting.LIST_AS_FOLDERS,
}
or cloud_api.Capability.FOLDERS not in self._client.capabilities
):
return resource
if (
self._folder_setting is folder_util.FolderSetting.LIST_AS_FOLDERS
and not is_hns_bucket
):
return resource
try:
prefix_url = resource.storage_url
return self._client.get_folder(
prefix_url.bucket_name, prefix_url.resource_name
)
except api_errors.NotFoundError:
return resource
def _expand_object_path(self, bucket_name, is_hns_bucket=False):
"""Expands object names.
Args:
bucket_name (str): Name of the bucket.
is_hns_bucket (bool): Whether the bucket is an HNS bucket.
Yields:
resource_reference.Resource objects where each resource can be
an ObjectResource object or a PrefixResource object.
"""
original_object_name = self._url.resource_name
if original_object_name.endswith(self._url.delimiter):
if not contains_wildcard(self._url.resource_name):
# Get object with trailing slash in addition to prefix check below.
direct_query_result = self._try_getting_object_directly(bucket_name)
if direct_query_result:
yield direct_query_result
# Force API to return prefix resource not the prefix's contents.
object_name = storage_url.rstrip_one_delimiter(original_object_name)
else:
object_name = original_object_name
names_needing_expansion = collections.deque([object_name])
error = None
while names_needing_expansion:
name = names_needing_expansion.popleft()
# Parse out the prefix, delimiter, filter_pattern and suffix.
# Given a string 'a/b*c/d/e*f/g.txt', this will return
# CloudWildcardParts(prefix='a/b', filter_pattern='*c',
# delimiter='/', suffix='d/e*f/g.txt')
wildcard_parts = CloudWildcardParts.from_string(name, self._url.delimiter)
# Fetch all the objects and prefixes.
resource_iterator = self._get_resource_iterator(
bucket_name, wildcard_parts, is_hns_bucket
)
# We have all the objects and prefixes that matched wildcard_parts.prefix.
# Use filter_pattern to eliminate non-matching objects and prefixes.
filtered_resources = self._filter_resources(
resource_iterator,
wildcard_parts.prefix + wildcard_parts.filter_pattern,
)
for resource in filtered_resources:
resource_path = resource.storage_url.resource_name
if wildcard_parts.suffix:
# pylint: disable=unidiomatic-typecheck
# We do not want this check to pass for child classes.
if type(resource) is resource_reference.PrefixResource:
# pylint: enable=unidiomatic-typecheck
# Suffix is present, which indicates that we have more wildcards to
# expand. Let's say object_name is a/b1c. Then the new string that
# we want to expand will be a/b1c/d/e*f/g.txt
if WILDCARD_REGEX.search(resource_path):
error = command_errors.InvalidUrlError(
'Cloud folders named with wildcards are not supported.'
' API returned {}'.format(resource)
)
else:
names_needing_expansion.append(
resource_path + wildcard_parts.suffix
)
else:
# Make sure regular object not returned if the original query was for
# a prefix or object with a trailing delimiter.
# Needed for gs://b/f*/ to filter out gs://b/f.txt.
if not resource_path.endswith(
self._url.delimiter
) and original_object_name.endswith(self._url.delimiter):
continue
# The order is important as Folders should take precdence
# over Managed Folders for an HNS bucket, So if a resource is a
# Folder, then we need not convert it to a Managed Folder
resource = self._maybe_convert_prefix_to_folder(
resource, is_hns_bucket
)
if not isinstance(resource, resource_reference.FolderResource):
resource = self._maybe_convert_prefix_to_managed_folder(resource)
yield self._decrypt_resource_if_necessary(resource)
if error:
raise error
def _get_regex_patterns(self, wildcard_pattern):
"""Returns list of regex patterns derived from the wildcard patterns.
Args:
wildcard_pattern (str): A wilcard_pattern to filter the resources.
Returns:
List of compiled regex patterns.
This translates the wildcard_pattern and also creates some additional
patterns so that we can treat ** in a/b/c/**/d.txt as zero or more folders.
This means, a/b/c/d.txt will also be returned along with a/b/c/e/f/d.txt.
"""
# Case 1: The original pattern should always be present.
wildcard_patterns = [wildcard_pattern]
if not wildcard_pattern.endswith(storage_url.CLOUD_URL_DELIMITER):
# Case 2: Allow matching both objects and prefixes with same name.
wildcard_patterns.append(wildcard_pattern +
storage_url.CLOUD_URL_DELIMITER)
if '/**/' in wildcard_pattern:
# Case 3: Will fetch object gs://bucket/dir1/a.txt if pattern is
# gs://bucket/dir1/**/a.txt
updated_pattern = wildcard_pattern.replace('/**/', '/')
wildcard_patterns.append(updated_pattern)
else:
updated_pattern = wildcard_pattern
for pattern in (wildcard_pattern, updated_pattern):
if pattern.startswith('**/'):
# Case 4 (using wildcard_pattern): Will fetch object gs://bucket/a.txt
# if pattern is gs://bucket/**/a.txt. Note that '/**/' will match
# '/a.txt' not 'a.txt'.
# Case 5 (using updated_pattern): Will fetch gs://bucket/dir1/dir2/a.txt
# if the pattern is gs://bucket/**/dir1/**/a.txt
wildcard_patterns.append(pattern[3:])
return [re.compile(fnmatch.translate(p)) for p in wildcard_patterns]
def _filter_resources(self, resource_iterator, wildcard_pattern):
"""Filter out resources that do not match the wildcard_pattern.
Args:
resource_iterator (iterable): An iterable resource_reference.Resource
objects.
wildcard_pattern (str): The wildcard_pattern to filter the resources.
Yields:
resource_reference.Resource objects matching the wildcard_pattern
"""
regex_patterns = self._get_regex_patterns(wildcard_pattern)
for resource in resource_iterator:
if (self._url.generation and
resource.storage_url.generation != self._url.generation):
# Filter based on generation, if generation is present in the request.
continue
for regex_pattern in regex_patterns:
if regex_pattern.match(resource.storage_url.resource_name):
yield resource
break
def _fetch_buckets(self):
"""Fetch the bucket(s) corresponding to the url.
Returns:
An iterable of BucketResource or UnknownResource objects.
"""
if contains_wildcard(self._url.bucket_name):
return self._list_buckets_matching_wildcard(self._url.bucket_name)
elif self._url.is_bucket() and self._get_bucket_metadata:
# If --soft-deleted is specified, fetch all soft-deleted generations of
# this bucket. Otherwise, return the live bucket.
if self._soft_deleted_buckets:
return self._fetch_all_soft_deleted_generations_of_bucket(
self._url.bucket_name
)
return [
self._client.get_bucket(
bucket_name=self._url.bucket_name,
fields_scope=self._fields_scope,
)
]
else:
# Avoids API call.
return [resource_reference.UnknownResource(self._url)]
def _fetch_all_soft_deleted_generations_of_bucket(
self, bucket_name: str
) -> Iterator[resource_reference.BucketResource]:
"""Fetch the soft-deleted buckets with the given name.
List_buckets retrieves all versions of a bucket, including
soft-deleted ones. Get_bucket retrieves the live bucket or a specific
soft-deleted version of the bucket if generation is specified. This is
useful when needing to access a particular deleted version that has been
identified from the List_buckets output.
Args:
bucket_name (str): Bucket name.
Yields:
BucketResource objects.
"""
# TODO: b/350559758 - Add prefix support to list_buckets and use it here.
for bucket_resource in self._client.list_buckets(
fields_scope=self._fields_scope,
soft_deleted=self._soft_deleted_buckets,
):
if bucket_name == bucket_resource.name:
yield bucket_resource
def _list_buckets_matching_wildcard(
self, bucket_name: str
) -> Iterator[resource_reference.BucketResource]:
"""List buckets matching the wildcard pattern.
Args:
bucket_name (str): Bucket name with wildcard.
Yields:
BucketResource objects.
"""
regex = fnmatch.translate(bucket_name)
bucket_pattern = re.compile(regex)
for bucket_resource in self._client.list_buckets(
fields_scope=self._fields_scope,
soft_deleted=self._soft_deleted_buckets,
):
if bucket_pattern.match(bucket_resource.name):
yield bucket_resource
def _is_hns_bucket(self, bucket_name):
if (
self._folder_setting is not folder_util.FolderSetting.LIST_AS_FOLDERS
or cloud_api.Capability.STORAGE_LAYOUT not in self._client.capabilities
):
return False
try:
bucket_layout = self._client.get_storage_layout(bucket_name)
except api_errors.GcsApiError as error:
# GetStorageLayout requires ListObjects permission to work.
# While for most cases, (especially in this code path) the user would
# have the permission, we still ideally do not want to fail for a corner
# case where someone user may not have the required permission.
if error.payload.status_code != 403:
# Avoids unexpectedly escalating permissions.
raise
return False
return bool(
getattr(bucket_layout, 'hierarchicalNamespace', None)
and bucket_layout.hierarchicalNamespace.enabled
)
class CloudWildcardParts:
"""Different parts of the wildcard string used for querying and filtering."""
def __init__(self, prefix, filter_pattern, delimiter, suffix):
"""Initialize the CloudWildcardParts object.
Args:
prefix (str): The prefix string to be passed to the API request.
This is the substring before the first occurrance of the wildcard.
filter_pattern (str): The pattern to be used to filter out the results
returned by the list_objects call. This is a substring starting from
the first occurance of the wildcard upto the first delimiter.
delimiter (str): The delimiter to be passed to the api request.
suffix (str): The substirng after the first delimiter in the wildcard.
"""
self.prefix = prefix
self.filter_pattern = filter_pattern
self.delimiter = delimiter
self.suffix = suffix
@classmethod
def from_string(cls, string, delimiter=storage_url.CloudUrl.CLOUD_URL_DELIM):
"""Create a CloudWildcardParts instance from a string.
Args:
string (str): String that needs to be splitted into different parts.
delimiter (str): The delimiter to be used for splitting the string.
Returns:
WildcardParts object.
"""
# Let's assume name => "a/b/c/d*e/f/g*.txt".
# prefix => "a/b/c/d", wildcard_string => "*e/f/g*.txt".
prefix, wildcard_string = _split_on_wildcard(string)
# We expand one level at a time. Hence, spliting on delimiter.
# filter_pattern => "*e", suffix = "f/g*.txt".
filter_pattern, _, suffix = wildcard_string.partition(delimiter)
if '**' in filter_pattern:
# Fetch all objects for ** pattern. No delimiter is required since we
# want to fetch all the objects here.
delimiter = None
filter_pattern = wildcard_string
# Since we have fetched all the objects, suffix is no longer required.
suffix = None
return cls(prefix, filter_pattern, delimiter, suffix)
def __repr__(self):
return debug_output.generic_repr(self)
def _split_on_wildcard(string):
"""Split the string into two such that first part does not have any wildcard.
Args:
string (str): The string to be split.
Returns:
A 2-tuple where first part doesn't have any wildcard, and second part does
have a wildcard. If wildcard is not found, the second part is empty.
If string starts with a wildcard then first part is empty.
For example:
_split_on_wildcard('a/b/c/d*e/f/*.txt') => ('a/b/c/d', '*e/f/*.txt')
_split_on_wildcard('*e/f/*.txt') => ('', '*e/f/*.txt')
_split_on_wildcard('a/b/c/d') => ('a/b/c/d', '')
"""
match = WILDCARD_REGEX.search(string)
if match is None:
return string, ''
first_wildcard_idx = match.start()
prefix = string[:first_wildcard_idx]
wildcard_str = string[first_wildcard_idx:]
return prefix, wildcard_str