HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/storage/rsync_command_util.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utils for the rsync command."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import enum
import os

from googlecloudsdk.api_lib.storage import cloud_api
from googlecloudsdk.command_lib.storage import errors
from googlecloudsdk.command_lib.storage import fast_crc32c_util
from googlecloudsdk.command_lib.storage import hash_util
from googlecloudsdk.command_lib.storage import path_util
from googlecloudsdk.command_lib.storage import plurality_checkable_iterator
from googlecloudsdk.command_lib.storage import posix_util
from googlecloudsdk.command_lib.storage import progress_callbacks
from googlecloudsdk.command_lib.storage import storage_url
from googlecloudsdk.command_lib.storage import tracker_file_util
from googlecloudsdk.command_lib.storage import wildcard_iterator
from googlecloudsdk.command_lib.storage.resources import resource_reference
from googlecloudsdk.command_lib.storage.resources import resource_util
from googlecloudsdk.command_lib.storage.tasks import patch_file_posix_task
from googlecloudsdk.command_lib.storage.tasks.cp import copy_task_factory
from googlecloudsdk.command_lib.storage.tasks.cp import copy_util
from googlecloudsdk.command_lib.storage.tasks.objects import patch_object_task
from googlecloudsdk.command_lib.storage.tasks.rm import delete_task
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.util import files
import six


CSV_FIELD_SEPARATOR = ','
_CSV_COLUMNS_COUNT = 10
_NO_MATCHES_MESSAGE = 'Did not find existing container at: {}'


# Used to distinguish files containing objects and files containing managed
# folders. Added to a file name which is hashed.
_MANAGED_FOLDER_PREFIX = 'managed_folders'


def get_existing_or_placeholder_destination_resource(
    path, ignore_symlinks=True
):
  """Returns existing valid container or UnknownResource or raises."""
  resource_iterator = wildcard_iterator.get_wildcard_iterator(
      path,
      fields_scope=cloud_api.FieldsScope.SHORT,
      get_bucket_metadata=True,
      ignore_symlinks=ignore_symlinks,
  )
  plurality_checkable_resource_iterator = (
      plurality_checkable_iterator.PluralityCheckableIterator(resource_iterator)
  )

  if plurality_checkable_resource_iterator.is_empty():
    if wildcard_iterator.contains_wildcard(path):
      raise errors.InvalidUrlError(
          'Wildcard pattern matched nothing. '
          + _NO_MATCHES_MESSAGE.format(path)
      )
    return resource_reference.UnknownResource(
        storage_url.storage_url_from_string(path)
    )

  if plurality_checkable_resource_iterator.is_plural():
    raise errors.InvalidUrlError(
        '{} matched more than one URL: {}'.format(
            path, list(plurality_checkable_resource_iterator)
        )
    )

  resource = list(plurality_checkable_resource_iterator)[0]
  if resource.is_container():
    return resource
  raise errors.InvalidUrlError(
      '{} matched non-container URL: {}'.format(path, resource)
  )


def get_existing_container_resource(path, ignore_symlinks=True):
  """Gets existing container resource at path and errors otherwise."""
  resource = get_existing_or_placeholder_destination_resource(
      path, ignore_symlinks
  )
  if isinstance(resource, resource_reference.UnknownResource):
    raise errors.InvalidUrlError(_NO_MATCHES_MESSAGE.format(path))
  return resource


def get_hashed_list_file_path(
    list_file_name, chunk_number=None, is_managed_folder_list=False
):
  """Hashes and returns a list file path.

  Args:
    list_file_name (str): The list file name prior to it being hashed.
    chunk_number (int|None): The number of the chunk fetched if file represents
      chunk of total list.
    is_managed_folder_list (bool): If True, the file will contain managed folder
      resources instead of object resources, and should have a different name.

  Returns:
    str: Final (hashed) list file path.

  Raises:
    Error: Hashed file path is too long.
  """
  delimiterless_file_name = tracker_file_util.get_delimiterless_file_path(
      list_file_name
  )

  # Added as a prefix, since a suffix of delimiterless_file_name is added to the
  # hashed file name.
  managed_folder_prefix = (
      _MANAGED_FOLDER_PREFIX if is_managed_folder_list else ''
  )
  hashed_file_name = tracker_file_util.get_hashed_file_name(
      managed_folder_prefix + delimiterless_file_name
  )

  if chunk_number is None:
    hashed_file_name_with_type = 'FULL_{}'.format(hashed_file_name)
  else:
    hashed_file_name_with_type = 'CHUNK_{}_{}'.format(
        hashed_file_name, chunk_number
    )

  tracker_file_util.raise_exceeds_max_length_error(hashed_file_name_with_type)
  return os.path.join(
      properties.VALUES.storage.rsync_files_directory.Get(),
      hashed_file_name_with_type,
  )


def try_to_delete_file(path):
  """Tries to delete file and debug logs instead of failing on error."""
  try:
    os.remove(path)
  except Exception as e:  # pylint:disable=broad-except
    log.debug('Failed to delete file {}: {}'.format(path, e))


def get_csv_line_from_resource(resource):
  """Builds a line for files listing the contents of the source and destination.

  Args:
    resource (FileObjectResource|ObjectResource|ManagedFolderResource): Contains
      item URL and metadata, which can be generated from the local file in the
      case of FileObjectResource.

  Returns:
    String formatted as "URL,etag,size,atime,mtime,uid,gid,mode,crc32c,md5".
      A missing field is represented as an empty string.
      "mtime" means "modification time", a Unix timestamp in UTC.
      "mode" is in base-eight (octal) form, e.g. "440".
  """
  url = resource.storage_url.url_string
  if isinstance(resource, resource_reference.ManagedFolderResource):
    # Managed folders are not associated with any metadata we can use in diffs,
    # other than their name.
    return url

  if isinstance(resource, resource_reference.FileObjectResource):
    etag = None
    size = None
    storage_class = None
    atime = None
    mtime = None
    uid = None
    gid = None
    mode_base_eight = None
    crc32c = None
    md5 = None
  else:
    etag = resource.etag
    size = resource.size
    storage_class = resource.storage_class
    atime, custom_metadata_mtime, uid, gid, mode = (
        posix_util.get_posix_attributes_from_cloud_resource(resource)
    )
    if custom_metadata_mtime is not None:
      mtime = custom_metadata_mtime
    else:
      # Use cloud object creation time as modification time. Since cloud objects
      # are immutable, creation is the only time of "modification." Populating
      # mtime allows checks to see if we can skip tasks.
      mtime = resource_util.get_unix_timestamp_in_utc(resource.creation_time)

    mode_base_eight = mode.base_eight_str if mode else None
    if resource.crc32c_hash == resource_reference.NOT_SUPPORTED_DO_NOT_DISPLAY:
      crc32c = None
    else:
      crc32c = resource.crc32c_hash
    md5 = resource.md5_hash

  line_values = [
      url,
      etag,
      size,
      storage_class,
      atime,
      mtime,
      uid,
      gid,
      mode_base_eight,
      crc32c,
      md5,
  ]
  return CSV_FIELD_SEPARATOR.join(
      ['' if x is None else six.text_type(x) for x in line_values]
  )


def get_fields_from_csv_line(line):
  """Splits and returns the fields from a CSV line."""
  # Capping splits prevents commas in URL from being caught.
  return line.rstrip().rsplit(CSV_FIELD_SEPARATOR, _CSV_COLUMNS_COUNT)


def parse_csv_line_to_resource(line, is_managed_folder=False):
  """Parses a line from files listing of rsync source and destination.

  Args:
    line (str|None): CSV line. See `get_csv_line_from_resource` docstring.
    is_managed_folder (bool): If True, returns a managed folder resource for
      cloud URLs. Otherwise, returns an object URL.

  Returns:
    FileObjectResource|ManagedFolderResource|ObjectResource|None: Resource
      containing data needed for rsync if data line given.
  """
  if not line:
    return None
  # Capping splits prevents commas in URL from being caught.
  line_information = get_fields_from_csv_line(line)
  url_string = line_information[0]
  url_object = storage_url.storage_url_from_string(url_string)

  if isinstance(url_object, storage_url.FileUrl):
    return resource_reference.FileObjectResource(url_object)

  if is_managed_folder:
    return resource_reference.ManagedFolderResource(url_object)

  (
      _,
      etag_string,
      size_string,
      storage_class_string,
      atime_string,
      mtime_string,
      uid_string,
      gid_string,
      mode_base_eight_string,
      crc32c_string,
      md5_string,
  ) = line.rstrip().rsplit(CSV_FIELD_SEPARATOR, _CSV_COLUMNS_COUNT)

  cloud_object = resource_reference.ObjectResource(
      url_object,
      etag=etag_string if etag_string else None,
      size=int(size_string) if size_string else None,
      storage_class=storage_class_string if storage_class_string else None,
      crc32c_hash=crc32c_string if crc32c_string else None,
      md5_hash=md5_string if md5_string else None,
      custom_fields={},
  )
  posix_util.update_custom_metadata_dict_with_posix_attributes(
      cloud_object.custom_fields,
      posix_util.PosixAttributes(
          atime=int(atime_string) if atime_string else None,
          mtime=int(mtime_string) if mtime_string else None,
          uid=int(uid_string) if uid_string else None,
          gid=int(gid_string) if gid_string else None,
          mode=posix_util.PosixMode.from_base_eight_str(mode_base_eight_string)
          if mode_base_eight_string
          else None,
      ),
  )
  return cloud_object


def _compute_hashes_and_return_match(source_resource, destination_resource):
  """Does minimal computation to compare checksums of resources."""
  if source_resource.size != destination_resource.size:
    # Prioritizing this above other checks is an artifact from gsutil.
    # Hashes should always be different if size is different.
    return False

  check_hashes = properties.VALUES.storage.check_hashes.Get()
  if check_hashes == properties.CheckHashes.NEVER.value:
    return True

  for resource in (source_resource, destination_resource):
    if isinstance(resource, resource_reference.ObjectResource) and (
        resource.crc32c_hash is resource.md5_hash is None
    ):
      log.warning(
          'Found no hashes to validate on {}. Will not copy unless file'
          ' modification time or size difference.'.format(
              resource.storage_url.versionless_url_string
          )
      )
      # Doing the copy would be safer, but we skip for parity with gsutil.
      return True

  if isinstance(
      source_resource, resource_reference.ObjectResource
  ) and isinstance(destination_resource, resource_reference.ObjectResource):
    source_crc32c = source_resource.crc32c_hash
    destination_crc32c = destination_resource.crc32c_hash
    source_md5 = source_resource.md5_hash
    destination_md5 = destination_resource.md5_hash
    log.debug(
        'Comparing hashes for two cloud objects. CRC32C checked first.'
        ' If no comparable hash pairs, will not copy.\n'
        '{}:\n'
        '  CRC32C: {}\n'
        '  MD5: {}\n'
        '{}:\n'
        '  CRC32C: {}\n'
        '  MD5: {}\n'.format(
            source_resource.storage_url.versionless_url_string,
            source_crc32c,
            source_md5,
            destination_resource.storage_url.versionless_url_string,
            destination_crc32c,
            destination_md5,
        )
    )
    if source_crc32c is not None and destination_crc32c is not None:
      return source_crc32c == destination_crc32c
    if source_md5 is not None and destination_md5 is not None:
      return source_md5 == destination_md5
    return True

  # Local-to-local rsync not allowed, so one of these is a cloud resource.
  is_upload = isinstance(source_resource, resource_reference.FileObjectResource)
  if is_upload:
    cloud_resource = destination_resource
    local_resource = source_resource
  else:
    cloud_resource = source_resource
    local_resource = destination_resource

  if cloud_resource.crc32c_hash is not None and cloud_resource.md5_hash is None:
    # We must do a CRC32C check.
    # Let existing download flow warn that ALWAYS check may be slow.
    fast_crc32c_util.log_or_raise_crc32c_issues(warn_for_always=is_upload)
    if (
        not fast_crc32c_util.check_if_will_use_fast_crc32c(
            install_if_missing=True
        )
        and check_hashes == properties.CheckHashes.IF_FAST_ELSE_SKIP.value
    ):
      return True
    compare_crc32c = True
  elif cloud_resource.crc32c_hash is not None:
    # Prioritizing CRC32C over MD5 because google-crc32c seems significantly
    # faster than MD5 for gigabyte+ objects.
    compare_crc32c = fast_crc32c_util.check_if_will_use_fast_crc32c(
        install_if_missing=False
    )
  else:
    compare_crc32c = False

  if compare_crc32c:
    hash_algorithm = hash_util.HashAlgorithm.CRC32C
    cloud_hash = cloud_resource.crc32c_hash
  else:
    hash_algorithm = hash_util.HashAlgorithm.MD5
    cloud_hash = cloud_resource.md5_hash

  local_hash = hash_util.get_base64_hash_digest_string(
      hash_util.get_hash_from_file(
          local_resource.storage_url.resource_name, hash_algorithm
      )
  )
  return cloud_hash == local_hash


def _compare_metadata_and_return_copy_needed(
    source_resource,
    destination_resource,
    source_mtime,
    destination_mtime,
    compare_only_hashes=False,
    is_cloud_source_and_destination=False,
):
  """Compares metadata and returns if source should be copied to destination."""
  # Two cloud objects should have pre-generated hashes that are more reliable
  # than mtime for seeing file differences. This ignores the unusual case where
  # cloud hashes are missing, but we still skip mtime for gsutil parity.
  skip_mtime_comparison = compare_only_hashes or is_cloud_source_and_destination
  if (
      not skip_mtime_comparison
      and source_mtime is not None
      and destination_mtime is not None
  ):
    # Ignore hashes like gsutil.
    return not (
        source_mtime == destination_mtime
        and source_resource.size == destination_resource.size
    )

  # Most expensive operation, computing hashes, saved as last resort.
  return not _compute_hashes_and_return_match(
      source_resource, destination_resource
  )


class _IterateResource(enum.Enum):
  """Indicates what resources to compare next."""

  SOURCE = 'source'
  DESTINATION = 'destination'
  BOTH = 'both'


def _should_skip_unsupported_object_type(
    resource, skip_unsupported: bool
) -> bool:
  """Checks if object type is unsupported and logs warning if so.

  Args:
    resource: The resource to check.
    skip_unsupported: Whether to skip unsupported object types.

  Returns:
    True if the object type is unsupported and needs to be skipped
    False otherwise.
  """
  if skip_unsupported:
    unsupported_type = resource_util.get_unsupported_object_type(resource)
    if unsupported_type:
      log.status.Print(
          resource_util.UNSUPPORTED_OBJECT_WARNING_FORMAT.format(
              resource, unsupported_type.value
          )
      )
      return True
  return False


def _get_copy_task(
    user_request_args,
    source_resource,
    posix_to_set=None,
    source_container=None,
    destination_resource=None,
    destination_container=None,
    dry_run=False,
    skip_unsupported=False,
):
  """Generates copy tasks with generic settings and logic."""
  if _should_skip_unsupported_object_type(source_resource, skip_unsupported):
    return

  if destination_resource:
    copy_destination = destination_resource
  else:
    # Must have destination_container if not destination_resource.
    copy_destination = _get_copy_destination_resource(
        source_resource, source_container, destination_container
    )
  if dry_run:
    if isinstance(source_resource, resource_reference.FileObjectResource):
      try:
        with files.BinaryFileReader(source_resource.storage_url.resource_name):
          pass
      except:  # pylint: disable=broad-except
        log.error(
            'Could not open {}'.format(
                source_resource.storage_url.resource_name
            )
        )
        raise
    log.status.Print(
        'Would copy {} to {}'.format(source_resource, copy_destination)
    )
    return

  if (
      isinstance(source_resource, resource_reference.CloudResource) and (
          isinstance(copy_destination.storage_url, storage_url.CloudUrl)
      )
  ):
    if (
        user_request_args.resource_args
        and user_request_args.resource_args.preserve_acl
    ):
      fields_scope = cloud_api.FieldsScope.FULL
    else:
      fields_scope = cloud_api.FieldsScope.NO_ACL
  else:
    fields_scope = None

  return copy_task_factory.get_copy_task(
      source_resource,
      copy_destination,
      do_not_decompress=True,
      fetch_source_fields_scope=fields_scope,
      posix_to_set=posix_to_set,
      user_request_args=user_request_args,
      verbose=True,
  )


def _get_iterator_instruction_for_no_clobber_and_new_mtimes(
    delete_unmatched_destination_objects,
    use_gsutil_delete_unmatched_behaviour,
):
  """Returns the iterator instruction for no clobber and new mtimes flags."""

  # If forcing deletion, we only advance the source, allowing the destination
  # to be considered 'unmatched' and deleted in a subsequent step if it lacks
  # a source counterpart.
  # Otherwise, for the standard (gsutil) behavior, we advance both to mark the
  # destination as 'handled' and prevent its deletion.
  iteration_instruction = _IterateResource.SOURCE
  if (
      delete_unmatched_destination_objects
      and use_gsutil_delete_unmatched_behaviour
  ):
    iteration_instruction = _IterateResource.BOTH
  return iteration_instruction


def _compare_equal_object_urls_to_get_task_and_iteration_instruction(
    user_request_args,
    source_object,
    destination_object,
    posix_to_set,
    compare_only_hashes=False,
    dry_run=False,
    skip_if_destination_has_later_modification_time=False,
    skip_unsupported=False,
    delete_unmatched_destination_objects=False,
    use_gsutil_delete_unmatched_behaviour=False,
):
  """Similar to get_task_and_iteration_instruction except for equal URLs."""

  destination_posix = posix_util.get_posix_attributes_from_resource(
      destination_object
  )
  if (
      skip_if_destination_has_later_modification_time
      and posix_to_set.mtime is not None
      and destination_posix.mtime is not None
      and posix_to_set.mtime < destination_posix.mtime
  ):
    # This is technically a metadata comparison, but it would complicate
    # `_compare_metadata_and_return_copy_needed`.
    iteration_instruction = (
        _get_iterator_instruction_for_no_clobber_and_new_mtimes(
            delete_unmatched_destination_objects,
            use_gsutil_delete_unmatched_behaviour,
        )
    )
    return (None, iteration_instruction)

  is_cloud_source_and_destination = isinstance(
      source_object, resource_reference.ObjectResource
  ) and isinstance(destination_object, resource_reference.ObjectResource)
  if _compare_metadata_and_return_copy_needed(
      source_object,
      destination_object,
      posix_to_set.mtime,
      destination_posix.mtime,
      compare_only_hashes=compare_only_hashes,
      is_cloud_source_and_destination=is_cloud_source_and_destination,
  ):
    # Possible performance improvement would be adding infra to pass the known
    # POSIX info to upload tasks to avoid an `os.stat` call.
    return (
        _get_copy_task(
            user_request_args,
            source_object,
            posix_to_set,
            destination_posix,
            destination_resource=destination_object,
            dry_run=dry_run,
            skip_unsupported=skip_unsupported,
        ),
        _IterateResource.BOTH,
    )

  need_full_posix_update = (
      user_request_args.preserve_posix and posix_to_set != destination_posix
  )

  # Since cloud-to-cloud uses hash comparisons instead of mtime, little reason
  # to waste an API call performing an mtime patch.
  need_mtime_update = (
      not is_cloud_source_and_destination
      and posix_to_set.mtime is not None
      and posix_to_set.mtime != destination_posix.mtime
  )
  if not (need_full_posix_update or need_mtime_update):
    return (None, _IterateResource.BOTH)

  if dry_run:
    if need_full_posix_update:
      log.status.Print(
          'Would set POSIX attributes for {}'.format(destination_object)
      )
    else:
      log.status.Print('Would set mtime for {}'.format(destination_object))
    return (None, _IterateResource.BOTH)

  if isinstance(destination_object, resource_reference.ObjectResource):
    return (
        patch_object_task.PatchObjectTask(
            destination_object,
            posix_to_set=posix_to_set,
            user_request_args=user_request_args,
        ),
        _IterateResource.BOTH,
    )
  return (
      patch_file_posix_task.PatchFilePosixTask(
          posix_util.get_system_posix_data(),
          source_object,
          destination_object,
          posix_to_set,
          destination_posix,
      ),
      _IterateResource.BOTH,
  )


def _get_url_string_minus_base_container(object_resource, container_resource):
  """Removes container URL prefix from object URL."""
  container_url = container_resource.storage_url
  container_url_string_with_trailing_delimiter = container_url.join(
      ''
  ).versionless_url_string
  object_url_string = object_resource.storage_url.versionless_url_string
  if not object_url_string.startswith(
      container_url_string_with_trailing_delimiter
  ):
    raise errors.Error(
        'Received container {} that does not contain object {}.'.format(
            container_url_string_with_trailing_delimiter, object_url_string
        )
    )
  return object_url_string[len(container_url_string_with_trailing_delimiter) :]


def _get_comparison_url(object_resource, container_resource):
  """Gets URL to compare to decide if resources are the same."""
  containerless_object_url_string = _get_url_string_minus_base_container(
      object_resource, container_resource
  )
  # Standardizes Windows URLs.
  return containerless_object_url_string.replace(
      container_resource.storage_url.delimiter, storage_url.CLOUD_URL_DELIMITER
  )


def _get_copy_destination_resource(
    source_resource, source_container, destination_container
):
  """Gets destination resource needed for copy tasks."""
  containerless_source_string = _get_url_string_minus_base_container(
      source_resource, source_container
  )
  destination_delimited_containerless_source_string = (
      containerless_source_string.replace(
          source_resource.storage_url.delimiter,
          destination_container.storage_url.delimiter,
      )
  )
  new_destination_object_url = destination_container.storage_url.join(
      destination_delimited_containerless_source_string
  )

  new_destination_resource = resource_reference.UnknownResource(
      new_destination_object_url
  )

  return path_util.sanitize_file_resource_for_windows(new_destination_resource)


def _log_skipping_symlink(resource):
  log.warning('Skipping symlink {}'.format(resource))


def _print_would_remove(resource):
  log.status.Print('Would remove {}'.format(resource))


def _get_delete_task(resource, user_request_args):
  url = resource.storage_url
  if isinstance(url, storage_url.FileUrl):
    return delete_task.DeleteFileTask(
        url,
        user_request_args=user_request_args,
    )
  else:
    return delete_task.DeleteObjectTask(
        url,
        user_request_args=user_request_args,
    )


def _should_delete_unmatched_destination_object(
    destination_resource,
    delete_unmatched_destination_objects,
    use_gsutil_delete_unmatched_behaviour,
    skip_unsupported,
):
  """Returns True if the unmatched destination object should be deleted."""

  # If the flag is not set, we never delete unmatched destination objects.
  if not delete_unmatched_destination_objects:
    return False

  # If the flag is set and the gsutil behavior is used, we skip deletion if the
  # object type is unsupported.
  if use_gsutil_delete_unmatched_behaviour:
    return not _should_skip_unsupported_object_type(
        destination_resource, skip_unsupported
    )

  return True


def _get_task_and_iteration_for_unmatched_destination(
    user_request_args,
    destination_resource,
    delete_unmatched_destination_objects: bool = False,
    use_gsutil_delete_unmatched_behaviour: bool = False,
    dry_run: bool = False,
    skip_unsupported: bool = False,
):
  """Gets task and iteration instruction for deleting/preserving an unmatched destination while used with delete_unmatched_destination_objects.

  Args:
    user_request_args: User flags.
    destination_resource: Destination resource to check for deletion.
    delete_unmatched_destination_objects: Clear objects at the destination that
      are not present at the source.
    use_gsutil_delete_unmatched_behaviour: Preserves objects at the destination
      that are not present at the source if delete_unmatched_destination_objects
      is set, if they would normally be preserved by other flags just as gsutil.
    dry_run: Print what operations rsync would perform without actually
      executing them.
    skip_unsupported: Skip copying unsupported object types.

  Returns:
    A pair of task and iteration instruction.
      - A delete task if applicable, or None.
      - An iteration instruction for the next step.
  """
  # Managed folders are never deleted, regardless of flags.
  if isinstance(destination_resource, resource_reference.ManagedFolderResource):
    return (None, _IterateResource.DESTINATION)

  should_delete_unmatched_destination_object = (
      _should_delete_unmatched_destination_object(
          destination_resource,
          delete_unmatched_destination_objects,
          use_gsutil_delete_unmatched_behaviour,
          skip_unsupported,
      )
  )

  if not should_delete_unmatched_destination_object:
    return (None, _IterateResource.DESTINATION)

  if dry_run:
    _print_would_remove(destination_resource)
    return (None, _IterateResource.DESTINATION)

  return (
      _get_delete_task(destination_resource, user_request_args),
      _IterateResource.DESTINATION,
  )


def _get_task_and_iteration_instruction(
    user_request_args,
    source_resource,
    source_container,
    destination_resource,
    destination_container,
    compare_only_hashes=False,
    delete_unmatched_destination_objects=False,
    dry_run=False,
    ignore_symlinks=False,
    skip_if_destination_has_later_modification_time=False,
    skip_unsupported=False,
):
  """Compares resources and returns next rsync step.

  Args:
    user_request_args (UserRequestArgs): User flags.
    source_resource: Source resource for comparison, a FileObjectResource,
      ManagedFolderResource, ObjectResource, or None. `None` indicates no
      sources left to copy.
    source_container (FileDirectoryResource|PrefixResource|BucketResource):
      Stripped from beginning of source_resource to get comparison URL.
    destination_resource: Destination resource for comparison, a
      FileObjectResource, ManagedFolderResource, ObjectResource, or None. `None`
      indicates all remaining source resources are new.
    destination_container (FileDirectoryResource|PrefixResource|BucketResource):
      If a copy task is generated for a source item with no equivalent existing
      destination item, it will copy to this general container. Also used to get
      comparison URL.
    compare_only_hashes (bool): Skip modification time comparison.
    delete_unmatched_destination_objects (bool): Clear objects at the
      destination that are not present at the source.
    dry_run (bool): Print what operations rsync would perform without actually
      executing them.
    ignore_symlinks (bool): Skip operations involving symlinks.
    skip_if_destination_has_later_modification_time (bool): Don't act if mtime
      metadata indicates we'd be overwriting with an older version of an object.
    skip_unsupported (bool): Skip copying unsupported object types.

  Returns:
    A pair of with a task and iteration instruction.

    First entry:
    None: Don't do anything for these resources.
    DeleteTask: Remove an extra resource from the destination.
    FileDownloadTask|FileUploadTask|IntraCloudCopyTask|ManagedFolderCopyTask:
      Update the destination with a copy of the source object.
    PatchFilePosixTask: Update the file destination POSIX data with the source's
      POSIX data.
    PatchObjectTask: Update the cloud destination's POSIX data with the source's
      POSIX data.

    Second entry:
    _IterateResource: Enum value indicating what to compare next.

  Raises:
    errors.Error: Missing a resource (does not account for subfunction errors).
  """
  if not (source_resource or destination_resource):
    raise errors.Error(
        'Comparison requires at least a source or a destination.'
    )

  use_gsutil_delete_unmatched_behaviour = (
      properties.VALUES.storage.use_gsutil_rsync_delete_unmatched_destination_objects_behavior.GetBool()
  )

  if not source_resource:
    return _get_task_and_iteration_for_unmatched_destination(
        user_request_args,
        destination_resource,
        delete_unmatched_destination_objects,
        use_gsutil_delete_unmatched_behaviour,
        dry_run,
        skip_unsupported,
    )

  if ignore_symlinks and source_resource.is_symlink:
    _log_skipping_symlink(source_resource)
    return (None, _IterateResource.SOURCE)

  if not isinstance(source_resource, resource_reference.ManagedFolderResource):
    source_posix = posix_util.get_posix_attributes_from_resource(
        source_resource
    )
    if user_request_args.preserve_posix:
      posix_to_set = source_posix
    else:
      posix_to_set = posix_util.PosixAttributes(
          None, source_posix.mtime, None, None, None
      )
  else:
    posix_to_set = None

  if not destination_resource:
    return (
        _get_copy_task(
            user_request_args,
            source_resource,
            posix_to_set=posix_to_set,
            source_container=source_container,
            destination_container=destination_container,
            dry_run=dry_run,
            skip_unsupported=skip_unsupported,
        ),
        _IterateResource.SOURCE,
    )

  if ignore_symlinks and destination_resource.is_symlink:
    _log_skipping_symlink(destination_resource)
    return (None, _IterateResource.DESTINATION)

  source_url = _get_comparison_url(source_resource, source_container)
  destination_url = _get_comparison_url(
      destination_resource, destination_container
  )
  if source_url < destination_url:
    return (
        _get_copy_task(
            user_request_args,
            source_resource,
            posix_to_set=posix_to_set,
            source_container=source_container,
            destination_container=destination_container,
            dry_run=dry_run,
            skip_unsupported=skip_unsupported,
        ),
        _IterateResource.SOURCE,
    )

  if source_url > destination_url:
    return _get_task_and_iteration_for_unmatched_destination(
        user_request_args,
        destination_resource,
        delete_unmatched_destination_objects,
        use_gsutil_delete_unmatched_behaviour,
        dry_run,
        skip_unsupported,
    )

  if user_request_args.no_clobber:
    iteration_instruction = (
        _get_iterator_instruction_for_no_clobber_and_new_mtimes(
            delete_unmatched_destination_objects,
            use_gsutil_delete_unmatched_behaviour,
        )
    )

    return (None, iteration_instruction)

  if isinstance(source_resource, resource_reference.ManagedFolderResource):
    # No metadata diffing is performed for managed folders.
    return (
        _get_copy_task(
            user_request_args,
            source_resource,
            source_container=source_container,
            destination_resource=destination_resource,
            destination_container=destination_container,
            dry_run=dry_run,
            posix_to_set=None,
            skip_unsupported=skip_unsupported,
        ),
        _IterateResource.BOTH,
    )

  return _compare_equal_object_urls_to_get_task_and_iteration_instruction(
      user_request_args,
      source_resource,
      destination_resource,
      posix_to_set,
      compare_only_hashes=compare_only_hashes,
      dry_run=dry_run,
      skip_if_destination_has_later_modification_time=(
          skip_if_destination_has_later_modification_time
      ),
      skip_unsupported=skip_unsupported,
      delete_unmatched_destination_objects=delete_unmatched_destination_objects,
      use_gsutil_delete_unmatched_behaviour=(
          use_gsutil_delete_unmatched_behaviour
      ),
  )


def get_operation_iterator(
    user_request_args,
    source_list_file,
    source_container,
    destination_list_file,
    destination_container,
    compare_only_hashes=False,
    delete_unmatched_destination_objects=False,
    dry_run=False,
    ignore_symlinks=False,
    yield_managed_folder_operations=False,
    skip_if_destination_has_later_modification_time=False,
    skip_unsupported=False,
    task_status_queue=None,
):
  """Returns task with next rsync operation (patch, delete, copy, etc)."""
  operation_count = bytes_operated_on = 0
  with files.FileReader(source_list_file) as source_reader, files.FileReader(
      destination_list_file
  ) as destination_reader:
    source_resource = parse_csv_line_to_resource(
        next(source_reader, None),
        is_managed_folder=yield_managed_folder_operations,
    )
    destination_resource = parse_csv_line_to_resource(
        next(destination_reader, None),
        is_managed_folder=yield_managed_folder_operations,
    )

    while source_resource or destination_resource:
      task, iteration_instruction = _get_task_and_iteration_instruction(
          user_request_args,
          source_resource,
          source_container,
          destination_resource,
          destination_container,
          compare_only_hashes=compare_only_hashes,
          delete_unmatched_destination_objects=(
              delete_unmatched_destination_objects
          ),
          dry_run=dry_run,
          ignore_symlinks=ignore_symlinks,
          skip_if_destination_has_later_modification_time=(
              skip_if_destination_has_later_modification_time
          ),
          skip_unsupported=skip_unsupported,
      )
      if task:
        operation_count += 1
        if isinstance(task, copy_util.ObjectCopyTask):
          bytes_operated_on += source_resource.size or 0
        yield task
      if iteration_instruction in (
          _IterateResource.SOURCE,
          _IterateResource.BOTH,
      ):
        source_resource = parse_csv_line_to_resource(
            next(source_reader, None),
            is_managed_folder=yield_managed_folder_operations,
        )
      if iteration_instruction in (
          _IterateResource.DESTINATION,
          _IterateResource.BOTH,
      ):
        destination_resource = parse_csv_line_to_resource(
            next(destination_reader, None),
            is_managed_folder=yield_managed_folder_operations,
        )

  if task_status_queue and (operation_count or bytes_operated_on):
    progress_callbacks.workload_estimator_callback(
        task_status_queue, item_count=operation_count, size=bytes_operated_on
    )