File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/storage/tasks/cp/copy_task_iterator.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Task iterator for copy functionality."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import os
from googlecloudsdk.api_lib.storage import cloud_api
from googlecloudsdk.command_lib.storage import errors
from googlecloudsdk.command_lib.storage import folder_util
from googlecloudsdk.command_lib.storage import manifest_util
from googlecloudsdk.command_lib.storage import path_util
from googlecloudsdk.command_lib.storage import plurality_checkable_iterator
from googlecloudsdk.command_lib.storage import posix_util
from googlecloudsdk.command_lib.storage import progress_callbacks
from googlecloudsdk.command_lib.storage import storage_url
from googlecloudsdk.command_lib.storage import wildcard_iterator
from googlecloudsdk.command_lib.storage.resources import gcs_resource_reference
from googlecloudsdk.command_lib.storage.resources import resource_reference
from googlecloudsdk.command_lib.storage.resources import resource_util
from googlecloudsdk.command_lib.storage.tasks.cp import copy_task_factory
from googlecloudsdk.command_lib.storage.tasks.cp import copy_util
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
_ONE_TB_IN_BYTES = 1099511627776
_RELATIVE_PATH_SYMBOLS = frozenset(['.', '..'])
def _expand_destination_wildcards(destination_string, folders_only=False):
"""Expands destination wildcards.
Ensures that only one resource matches the wildcard expanded string. Much
like the unix cp command, the storage surface only supports copy operations
to one user-specified destination.
Args:
destination_string (str): A string representing the destination url.
folders_only (bool): If True, indicates that we are invoking folders only
copy task.
Returns:
A resource_reference.Resource, or None if no matching resource is found.
Raises:
InvalidUrlError if more than one resource is matched, or the source
contained an unescaped wildcard and no resources were matched.
"""
destination_iterator = (
plurality_checkable_iterator.PluralityCheckableIterator(
wildcard_iterator.get_wildcard_iterator(
destination_string,
folder_setting=folder_util.FolderSetting.LIST_AS_FOLDERS
if folders_only
else folder_util.FolderSetting.DO_NOT_LIST,
fields_scope=cloud_api.FieldsScope.SHORT,
)
)
)
if destination_iterator.is_plural():
# If the result is plural, we are bound to throw an error.
# But we also should check if this is a case of duplicate results due to a
# placeholder folder which was created through the UI.
# If it is not the case, we continue with raising the Error and not moving
# further with the method.
# If it is the case of duplicates, we do not raise the Error, and rather
# continue with method execution as planned.
resolved_resource = _resolve_duplicate_ui_folder_destination(
destination_string, destination_iterator
)
if not resolved_resource:
raise errors.InvalidUrlError(
f'Destination ({destination_string}) must match exactly one URL.'
)
destination_iterator = (
plurality_checkable_iterator.PluralityCheckableIterator(
[resolved_resource]
)
)
contains_unexpanded_wildcard = (
destination_iterator.is_empty()
and wildcard_iterator.contains_wildcard(destination_string)
)
if contains_unexpanded_wildcard:
raise errors.InvalidUrlError(
f'Destination ({destination_string}) contains an unexpected wildcard.'
)
if not destination_iterator.is_empty():
return next(destination_iterator)
def _resolve_duplicate_ui_folder_destination(
destination_string, destination_iterator
):
"""Resolves duplicate resource results for placeholder folders created through the UI.
In the scenario where a user creates a placeholder folder
(which is actually an object ending with a '/' rather than a true folder as in
the case of HNS buckets), the CLI, when resolving for destination gets
two results as part of the ListObjects API call. One of these is of type
GCSObjectResource, while the other is PrefixResource. Technically both results
are correct and expected. But in our logic, we end up interpretting this case
as multiple destinations which we do not support.
This method determines if the given results come under the above scenario.
Args:
destination_string (str): A string representing the destination url.
destination_iterator (PluralityCheckableIterator): Contains results from the
destination search through the wildcard iterator.
Returns:
PrefixResource out of the two results of duplicate resources due to UI
folder creation, None otherwise.
"""
# The first condition would be to make sure that the destination string
# is of the type CloudURL and a GCS schema, because this case does not apply
# to any other type of destination.
destination_storage_url = storage_url.storage_url_from_string(
destination_string
)
if (
not isinstance(destination_storage_url, storage_url.CloudUrl)
or destination_storage_url.scheme != storage_url.ProviderPrefix.GCS
):
return None
destination_resource_1 = next(destination_iterator)
destination_resource_2 = next(destination_iterator)
# In case of a Folder created through the UI, we expect two resources.
# We never expect more than that to exist. So if we do encounter that case,
# then this is not the scenario of a UI created folder.
if not destination_iterator.is_empty():
return None
# Types of both resources cannot be the same since we expect a mix of
# GCSResourceReference and PrefixResource to be returned
# from the WildcardIterator in the case of Folders which are a part of the UI.
if isinstance(destination_resource_1, type(destination_resource_2)):
return None
# At least one of the resource has to be of type GcsObjectResource.
if not (
isinstance(
destination_resource_1, gcs_resource_reference.GcsObjectResource
)
or isinstance(
destination_resource_2, gcs_resource_reference.GcsObjectResource
)
):
return None
# Once we have determined that at least one of the resource is of type
# GcsObjectResource, we need to ensure that one of them is PrefixResource.
# In the case where we have two GcsObjectResource or one of them is not of
# type PrefixResource, we will return False as this is not a UI created folder
# case for sure.
if not (
isinstance(destination_resource_1, resource_reference.PrefixResource)
or isinstance(destination_resource_2, resource_reference.PrefixResource)
):
return None
if (
destination_resource_1.storage_url.versionless_url_string.endswith('/')
and destination_resource_2.storage_url.versionless_url_string.endswith(
'/'
)
) and (
destination_resource_1.storage_url.versionless_url_string
== destination_resource_2.storage_url.versionless_url_string
):
return (
destination_resource_1
if isinstance(destination_resource_1, resource_reference.PrefixResource)
else destination_resource_2
)
return None
def _get_raw_destination(destination_string, folders_only=False):
"""Converts self._destination_string to a destination resource.
Args:
destination_string (str): A string representing the destination url.
folders_only (bool): If True, indicates that we are invoking folders only
copy task.
Returns:
A resource_reference.Resource. Note that this resource may not be a valid
copy destination if it is a BucketResource, PrefixResource,
FileDirectoryResource or UnknownResource.
Raises:
InvalidUrlError if the destination url is a cloud provider or if it
specifies
a version.
"""
destination_url = storage_url.storage_url_from_string(destination_string)
if isinstance(destination_url, storage_url.CloudUrl):
if destination_url.is_provider():
raise errors.InvalidUrlError(
'The cp command does not support provider-only destination URLs.'
)
elif destination_url.generation is not None:
raise errors.InvalidUrlError(
'The destination argument of the cp command cannot be a '
'version-specific URL ({}).'.format(destination_string)
)
raw_destination = _expand_destination_wildcards(
destination_string, folders_only
)
if raw_destination:
return raw_destination
return resource_reference.UnknownResource(destination_url)
def _destination_is_container(destination):
"""Returns True is the destination can be treated as a container.
For a CloudUrl, a container is a bucket or a prefix. If the destination does
not exist, we determine this based on the delimiter.
For a FileUrl, A container is an existing dir. For non existing path, we
return False.
Args:
destination (resource_reference.Resource): The destination container.
Returns:
bool: True if destination is a valid container.
"""
try:
if destination.is_container():
return True
except errors.ValueCannotBeDeterminedError:
# Some resource classes are not clearly containers, like objects with names
# ending in a delimiter. However, we want to treat them as containers anways
# so that nesting at copy destinations will work as expected.
pass
destination_url = destination.storage_url
if isinstance(destination_url, storage_url.FileUrl):
# We don't want to treat non-existing file paths as valid containers.
return os.path.isdir(destination_url.resource_name)
return (destination_url.versionless_url_string.endswith(
destination_url.delimiter) or
(isinstance(destination_url, storage_url.CloudUrl) and
destination_url.is_bucket()))
def _resource_is_stream(resource):
"""Checks if a resource points to local pipe-type."""
return (isinstance(resource.storage_url, storage_url.FileUrl) and
resource.storage_url.is_stream)
def _is_expanded_url_valid_parent_dir(expanded_url):
"""Returns True if not FileUrl ending in relative path symbols.
A URL is invalid if it is a FileUrl and the parent directory of the file is a
relative path symbol. Unix will not allow a file itself to be named with a
relative path symbol, but one can be the parent. Notably, "../obj" can lead
to unexpected behavior at the copy destination. We examine the pre-recursion
expanded_url, which might point to "..", to see if the parent is valid.
If the user does a recursive copy from an expanded URL, it may not end up
the final parent of the copied object. For example, see: "dir/nested_dir/obj".
If you ran "cp -r d* gs://bucket" from the parent of "dir", then the
expanded_url would be "dir", but "nested_dir" would be the parent of "obj".
This actually doesn't matter since recursion won't add relative path symbols
to the path. However, we still return if expanded_url is valid because
there are cases where we need to copy every parent directory up to
expanded_url "dir" to prevent file name conflicts.
Args:
expanded_url (StorageUrl): NameExpansionResult.expanded_url value. Should
contain wildcard-expanded URL before recursion. For example, if "d*"
expands to the object "dir/obj", we would get the "dir" value.
Returns:
Boolean indicating if the expanded_url is valid as a parent
directory.
"""
if not isinstance(expanded_url, storage_url.FileUrl):
return True
_, _, last_string_following_delimiter = (
expanded_url.versionless_url_string.rstrip(
expanded_url.delimiter).rpartition(expanded_url.delimiter))
return last_string_following_delimiter not in _RELATIVE_PATH_SYMBOLS and (
last_string_following_delimiter not in [
expanded_url.scheme.value + '://' + symbol
for symbol in _RELATIVE_PATH_SYMBOLS
])
class CopyTaskIterator:
"""Iterates over each expanded source and creates an appropriate copy task."""
def __init__(
self,
source_name_iterator,
destination_string,
custom_md5_digest=None,
delete_source=False,
do_not_decompress=False,
force_daisy_chain=False,
print_created_message=False,
shared_stream=None,
skip_unsupported=True,
task_status_queue=None,
user_request_args=None,
folders_only=False,
):
"""Initializes a CopyTaskIterator instance.
Args:
source_name_iterator (name_expansion.NameExpansionIterator): yields
resource_reference.Resource objects with expanded source URLs.
destination_string (str): The copy destination path or url.
custom_md5_digest (str|None): User-added MD5 hash output to send to server
for validating a single resource upload.
delete_source (bool): If copy completes successfully, delete the source
object afterwards.
do_not_decompress (bool): Prevents automatically decompressing downloaded
gzips.
force_daisy_chain (bool): If True, yields daisy chain copy tasks in place
of intra-cloud copy tasks.
print_created_message (bool): Print the versioned URL of each successfully
copied object.
shared_stream (stream): Multiple tasks may reuse a read or write stream.
skip_unsupported (bool): Skip creating copy tasks for unsupported object
types.
task_status_queue (multiprocessing.Queue|None): Used for estimating total
workload from this iterator.
user_request_args (UserRequestArgs|None): Values for RequestConfig.
folders_only (bool): If True, perform only folders tasks.
"""
self._all_versions = (
source_name_iterator.object_state
is cloud_api.ObjectState.LIVE_AND_NONCURRENT
)
self._has_multiple_top_level_sources = (
source_name_iterator.has_multiple_top_level_resources)
self._has_cloud_source = False
self._has_local_source = False
self._source_name_iterator = (
plurality_checkable_iterator.PluralityCheckableIterator(
source_name_iterator))
self._multiple_sources = self._source_name_iterator.is_plural()
self._custom_md5_digest = custom_md5_digest
self._delete_source = delete_source
self._do_not_decompress = do_not_decompress
self._force_daisy_chain = force_daisy_chain
self._print_created_message = print_created_message
self._shared_stream = shared_stream
self._skip_unsupported = skip_unsupported
self._task_status_queue = task_status_queue
self._user_request_args = user_request_args
self._folders_only = folders_only
self._total_file_count = 0
self._total_size = 0
self._raw_destination = _get_raw_destination(
destination_string, self._folders_only
)
if self._multiple_sources:
self._raise_if_destination_is_file_url_and_not_a_directory_or_pipe()
else:
# For multiple sources,
# _raise_if_destination_is_file_url_and_not_a_directory_or_pipe already
# checks for directory's existence.
self._raise_if_download_destination_ends_with_delimiter_and_does_not_exist()
if self._multiple_sources and self._custom_md5_digest:
raise errors.Error(
'Received multiple objects to upload, but only one'
' custom MD5 digest is allowed.'
)
self._already_completed_sources = manifest_util.parse_for_completed_sources(
getattr(user_request_args, 'manifest_path', None))
def _raise_error_if_source_matches_destination(self):
if not self._multiple_sources and not self._source_name_iterator.is_empty():
source_url = self._source_name_iterator.peek().expanded_url
if source_url == self._raw_destination.storage_url:
raise errors.InvalidUrlError(
'Source URL matches destination URL: {}'.format(source_url))
def _raise_error_if_expanded_source_matches_expanded_destination(
self, expanded_source_url, expanded_destination_url
):
if expanded_source_url == expanded_destination_url:
raise errors.InvalidUrlError(
'Destination URL {} already exists.'.format(expanded_destination_url)
)
def _raise_if_destination_is_file_url_and_not_a_directory_or_pipe(self):
if (isinstance(self._raw_destination.storage_url, storage_url.FileUrl) and
not (_destination_is_container(self._raw_destination) or
self._raw_destination.storage_url.is_stream)):
raise errors.InvalidUrlError(
'Destination URL must name an existing directory.'
' Provided: {}.'.format(
self._raw_destination.storage_url.resource_name))
def _raise_if_download_destination_ends_with_delimiter_and_does_not_exist(
self,
):
if isinstance(self._raw_destination.storage_url, storage_url.FileUrl):
# Download operation.
destination_path = self._raw_destination.storage_url.resource_name
if destination_path.endswith(
self._raw_destination.storage_url.delimiter
) and not self._raw_destination.storage_url.isdir():
raise errors.InvalidUrlError(
'Destination URL must name an existing directory if it ends with a'
' delimiter. Provided: {}.'.format(destination_path)
)
def _update_workload_estimation(self, resource):
"""Updates total_file_count and total_size.
Args:
resource (resource_reference.Resource): Any type of resource. Parse to
help estimate total workload.
"""
if self._total_file_count == -1 or self._total_size == -1:
# -1 is signal that data is corrupt and not worth tracking.
return
try:
if resource.is_container():
return
size = resource.size
if isinstance(resource, resource_reference.FileObjectResource):
self._has_local_source = True
elif isinstance(resource, resource_reference.ObjectResource):
self._has_cloud_source = True
else:
raise errors.ValueCannotBeDeterminedError
except (OSError, errors.ValueCannotBeDeterminedError):
if not _resource_is_stream(resource):
log.error('Could not get size of resource {}.'.format(resource))
self._total_file_count = -1
self._total_size = -1
else:
self._total_file_count += 1
self._total_size += size or 0
def _print_skip_and_maybe_send_to_manifest(self, message, source):
"""Prints why task is being skipped and maybe records in manifest."""
log.status.Print(message)
if (
self._user_request_args
and self._user_request_args.manifest_path
and self._task_status_queue
):
manifest_util.send_skip_message(
self._task_status_queue,
source.resource,
self._raw_destination,
message,
)
def __iter__(self):
self._raise_error_if_source_matches_destination()
is_source_plural = self._source_name_iterator.is_plural()
for source in self._source_name_iterator:
if self._folders_only and not isinstance(
source.resource, resource_reference.FolderResource
):
continue
if self._delete_source:
copy_util.raise_if_mv_early_deletion_fee_applies(source.resource)
if self._skip_unsupported:
unsupported_type = resource_util.get_unsupported_object_type(
source.resource)
if unsupported_type:
message = resource_util.UNSUPPORTED_OBJECT_WARNING_FORMAT.format(
source.resource.storage_url, unsupported_type.value
)
self._print_skip_and_maybe_send_to_manifest(message, source)
continue
if (
source.resource.storage_url.url_string
in self._already_completed_sources
):
message = (
'Skipping item {} because manifest marks it as'
' skipped or completed.'
).format(source.resource.storage_url)
self._print_skip_and_maybe_send_to_manifest(message, source)
continue
destination_resource = self._get_copy_destination(
self._raw_destination, source, is_source_plural
)
source_url = source.resource.storage_url
destination_url = destination_resource.storage_url
self._raise_error_if_expanded_source_matches_expanded_destination(
source_url, destination_url
)
if (
self._folders_only
and self._delete_source
and (
source_url.scheme != destination_url.scheme
or source_url.bucket_name != destination_url.bucket_name
)
):
continue
posix_util.run_if_setting_posix(
posix_to_set=None,
user_request_args=self._user_request_args,
function=posix_util.raise_if_source_and_destination_not_valid_for_preserve_posix,
source_url=source_url,
destination_url=destination_url,
)
if (isinstance(source.resource, resource_reference.ObjectResource) and
isinstance(destination_url, storage_url.FileUrl) and
destination_url.resource_name.endswith(destination_url.delimiter)):
log.debug('Skipping downloading {} to {} since the destination ends in'
' a file system delimiter.'.format(
source_url.versionless_url_string,
destination_url.versionless_url_string))
continue
if (not self._multiple_sources and source_url.versionless_url_string !=
source.expanded_url.versionless_url_string):
# Multiple sources have been already validated in __init__.
# This check is required for cases where recursion has been requested,
# but there is only one object that needs to be copied over.
self._raise_if_destination_is_file_url_and_not_a_directory_or_pipe()
if self._custom_md5_digest:
source.resource.md5_hash = self._custom_md5_digest
self._update_workload_estimation(source.resource)
yield copy_task_factory.get_copy_task(
source.resource,
destination_resource,
do_not_decompress=self._do_not_decompress,
delete_source=self._delete_source,
force_daisy_chain=self._force_daisy_chain,
print_created_message=self._print_created_message,
print_source_version=(
source.original_url.generation or self._all_versions
),
shared_stream=self._shared_stream,
verbose=True,
user_request_args=self._user_request_args,
)
if self._task_status_queue and (
self._total_file_count > 0 or self._total_size > 0
):
# Show fraction of total copies completed now that we know totals.
progress_callbacks.workload_estimator_callback(
self._task_status_queue,
item_count=self._total_file_count,
size=self._total_size,
)
if (
self._total_size > _ONE_TB_IN_BYTES
and self._has_cloud_source
and not self._has_local_source
and self._raw_destination.storage_url.scheme
is storage_url.ProviderPrefix.GCS
and properties.VALUES.storage.suggest_transfer.GetBool()
):
log.status.Print(
'For large copies, consider the `gcloud transfer jobs create ...`'
' command. Learn more at'
'\nhttps://cloud.google.com/storage-transfer-service'
'\nRun `gcloud config set storage/suggest_transfer False` to'
' disable this message.'
)
def _get_copy_destination(
self, raw_destination, source, is_source_plural=False
):
"""Returns the final destination StorageUrl instance."""
completion_is_necessary = (
_destination_is_container(raw_destination)
or (self._multiple_sources and not _resource_is_stream(raw_destination))
or source.resource.storage_url.versionless_url_string
!= source.expanded_url.versionless_url_string # Recursion case.
)
if completion_is_necessary:
if (
isinstance(source.expanded_url, storage_url.FileUrl)
and source.expanded_url.is_stdio
):
raise errors.Error(
'Destination object name needed when source is stdin.'
)
destination_resource = self._complete_destination(
raw_destination, source, is_source_plural
)
else:
destination_resource = raw_destination
sanitized_destination_resource = (
path_util.sanitize_file_resource_for_windows(destination_resource)
)
return sanitized_destination_resource
def _complete_destination(
self, destination_container, source, is_source_plural=False
):
"""Gets a valid copy destination incorporating part of the source's name.
When given a source file or object and a destination resource that should
be treated as a container, this function uses the last part of the source's
name to get an object or file resource representing the copy destination.
For example: given a source `dir/file` and a destination `gs://bucket/`, the
destination returned is a resource representing `gs://bucket/file`. Check
the recursive helper function docstring for details on recursion handling.
Args:
destination_container (resource_reference.Resource): The destination
container.
source (NameExpansionResult): Represents the source resource and the
expanded parent url in case of recursion.
is_source_plural (bool): True if the source is a plural resource.
Returns:
The completed destination, a resource_reference.Resource.
"""
destination_url = destination_container.storage_url
source_url = source.resource.storage_url
if (
source_url.versionless_url_string
!= source.expanded_url.versionless_url_string
):
# In case of recursion, the expanded_url can be the expanded wildcard URL
# representing the container, and the source url can be the file/object.
destination_suffix = self._get_destination_suffix_for_recursion(
destination_container, source
)
else:
# On Windows with a relative path URL like file://file.txt, partitioning
# on the delimiter will fail to remove file://, so destination_suffix
# would include the scheme. We remove the scheme here to avoid this.
_, _, url_without_scheme = source_url.versionless_url_string.rpartition(
source_url.scheme.value + '://'
)
# Ignores final slashes when completing names. For example, where
# source_url is gs://bucket/folder/ and destination_url is gs://bucket1,
# the completed URL should be gs://bucket1/folder/.
if url_without_scheme.endswith(source_url.delimiter):
url_without_scheme_and_trailing_delimiter = (
url_without_scheme[:-len(source_url.delimiter)]
)
else:
url_without_scheme_and_trailing_delimiter = url_without_scheme
_, _, destination_suffix = (
url_without_scheme_and_trailing_delimiter.rpartition(
source_url.delimiter
)
)
if url_without_scheme_and_trailing_delimiter != url_without_scheme:
# Adds the removed delimiter back.
destination_suffix += source_url.delimiter
destination_url_prefix = storage_url.storage_url_from_string(
destination_url.versionless_url_string.rstrip(destination_url.delimiter)
)
# For folders use-case, we want to rename/copy to the folder as the name
# of the destination if it does not exist. This is similar to the Filesystem
# and does not happen for flat buckets today. Hence this additional logic.
if (
self._folders_only
and isinstance(source.resource, resource_reference.FolderResource)
and not isinstance(
destination_container, resource_reference.FolderResource
)
and not is_source_plural
):
return resource_reference.UnknownResource(destination_url_prefix)
new_destination_url = destination_url_prefix.join(destination_suffix)
return resource_reference.UnknownResource(new_destination_url)
def _get_destination_suffix_for_recursion(
self, destination_container, source
):
"""Returns the suffix required to complete the destination URL.
Let's assume the following:
User command => cp -r */base_dir gs://dest/existing_prefix
source.resource.storage_url => a/base_dir/c/d.txt
source.expanded_url => a/base_dir
destination_container.storage_url => gs://dest/existing_prefix
If the destination container exists, the entire directory gets copied:
Result => gs://dest/existing_prefix/base_dir/c/d.txt
Args:
destination_container (resource_reference.Resource): The destination
container.
source (NameExpansionResult): Represents the source resource and the
expanded parent url in case of recursion.
Returns:
(str) The suffix to be appended to the destination container.
"""
source_prefix_to_ignore = storage_url.rstrip_one_delimiter(
source.expanded_url.versionless_url_string,
source.expanded_url.delimiter,
)
expanded_url_is_valid_parent = _is_expanded_url_valid_parent_dir(
source.expanded_url
)
if (
not expanded_url_is_valid_parent
and self._has_multiple_top_level_sources
):
# To avoid top-level name conflicts, we need to copy the parent dir.
# However, that cannot be done because the parent dir has an invalid name.
raise errors.InvalidUrlError(
'Presence of multiple top-level sources and invalid expanded URL'
' make file name conflicts possible for URL: {}'.format(
source.resource
)
)
is_top_level_source_object_name_conflict_possible = (
isinstance(destination_container, resource_reference.UnknownResource)
and self._has_multiple_top_level_sources
)
destination_exists = not isinstance(
destination_container, resource_reference.UnknownResource
)
destination_is_existing_dir = (
destination_exists and destination_container.is_container()
)
treat_destination_as_existing_dir = destination_is_existing_dir or (
not destination_exists
and destination_container.storage_url.url_string.endswith(
destination_container.storage_url.delimiter
)
)
if is_top_level_source_object_name_conflict_possible or (
expanded_url_is_valid_parent and treat_destination_as_existing_dir
):
# Remove the leaf name unless it is a relative path symbol, so that
# only top-level source directories are ignored.
# Presence of relative path symbols needs to be checked with the source
# to distinguish file://dir.. from file://dir/..
source_delimiter = source.resource.storage_url.delimiter
relative_path_characters_end_source_prefix = [
source_prefix_to_ignore.endswith(source_delimiter + i)
for i in _RELATIVE_PATH_SYMBOLS
]
# On Windows, source paths that are relative path symbols will not contain
# the source delimiter, e.g. file://.. This case thus needs to be detected
# separately.
source_url_scheme_string = source.expanded_url.scheme.value + '://'
source_prefix_to_ignore_without_scheme = source_prefix_to_ignore[
len(source_url_scheme_string):]
source_is_relative_path_symbol = (
source_prefix_to_ignore_without_scheme in _RELATIVE_PATH_SYMBOLS)
if (not any(relative_path_characters_end_source_prefix) and
not source_is_relative_path_symbol):
source_prefix_to_ignore, _, _ = source_prefix_to_ignore.rpartition(
source.expanded_url.delimiter)
if not source_prefix_to_ignore:
# In case of Windows, the source URL might not contain any Windows
# delimiter if it was a single directory (e.g file://dir) and
# source_prefix_to_ignore will be empty. Set it to <scheme>://.
# TODO(b/169093672) This will not be required if we get rid of file://
source_prefix_to_ignore = source.expanded_url.scheme.value + '://'
full_source_url = source.resource.storage_url.versionless_url_string
delimiter = source.resource.storage_url.delimiter
suffix_for_destination = delimiter + (
full_source_url.split(source_prefix_to_ignore)[1]
).lstrip(delimiter)
# Windows uses \ as a delimiter. Force the suffix to use the same
# delimiter used by the destination container.
source_delimiter = source.resource.storage_url.delimiter
destination_delimiter = destination_container.storage_url.delimiter
if source_delimiter != destination_delimiter:
return suffix_for_destination.replace(
source_delimiter, destination_delimiter
)
return suffix_for_destination