HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/storage/fast_crc32c_util.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities that augment the core CRC32C functionality for storage commands.

The core CRC32C utility provides a hashlib-like functionality for CRC32C
calculation but will at times fall back to a slow, all-Python implementation.
This utility provides several mitigation strategies to avoid relying on the slow
implementation of CRC32C, including adding a "deferred" strategy that uses the
component gcloud-crc32c on files after they are downloaded.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import os
import struct
import textwrap

from googlecloudsdk.command_lib import info_holder
from googlecloudsdk.command_lib.storage import errors
from googlecloudsdk.command_lib.util import crc32c
# TODO(b/243537215) Should be loaded from a more generic location
from googlecloudsdk.command_lib.util.anthos import binary_operations
from googlecloudsdk.core import log
from googlecloudsdk.core import properties


BINARY_NAME = 'gcloud-crc32c'


class GcloudCrc32cOperation(binary_operations.BinaryBackedOperation):
  """Operation for hashing a file using gcloud-crc32c."""

  def __init__(self, **kwargs):
    super(GcloudCrc32cOperation, self).__init__(binary=BINARY_NAME, **kwargs)

  def _ParseArgsForCommand(self, file_path, offset=0, length=0, **kwargs):
    return ['-o', str(offset), '-l', str(length), file_path]


class DeferredCrc32c(object):
  """Hashlib-like helper for deferring hash calculations to gcloud-crc32c.

  NOTE: Given this class relies on analyzing data on disk, it is not appropriate
  for hashing streaming downloads and will fail to work as expected.
  """

  def __init__(self, crc=0):
    """Sets up the internal checksum variable and allows an initial value.

    Args:
      crc (int): The initial checksum to be stored.
    """
    self._crc = crc

  def copy(self):
    return DeferredCrc32c(crc=self._crc)

  def update(self, data):
    # Does nothing so hash calculation can be deferred to sum_file.
    del data  # Unused.
    return

  def sum_file(self, file_path, offset, length):
    """Calculates checksum on a provided file path.

    Args:
      file_path (str): A string representing a path to a file.
      offset (int): The number of bytes to offset from the beginning of the
        file. Defaults to 0.
      length (int): The number of bytes to read into the file. If not specified
        will calculate until the end of file is encountered.
    """
    if offset is None or length is None:
      raise errors.Error(
          'gcloud_crc32c binary uses 0 (not `None`) to indicate'
          ' "no argument given."'
      )
    crc32c_operation = GcloudCrc32cOperation()
    result = crc32c_operation(file_path=file_path, offset=offset, length=length)
    self._crc = 0 if result.failed else int(result.stdout)

  def digest(self):
    """Returns the checksum in big-endian order, per RFC 4960.

    See: https://cloud.google.com/storage/docs/json_api/v1/objects#crc32c

    Returns:
      An eight-byte digest string.
    """
    return struct.pack('>L', self._crc)

  def hexdigest(self):
    """Returns a checksum like `digest` except as a bytestring of double length.

    Returns:
      A sixteen byte digest string, containing only hex digits.
    """
    return '{:08x}'.format(self._crc).encode('ascii')


def _is_gcloud_crc32c_installed():
  """Returns if gcloud-crc32c is installed and does not attempt install."""
  try:
    return BINARY_NAME in binary_operations.CheckForInstalledBinary(BINARY_NAME)
  except binary_operations.MissingExecutableException:
    return False


def _check_if_gcloud_crc32c_available(install_if_missing=False):
  """Returns True if gcloud-crc32c is installed and optionally installs."""
  try:
    return BINARY_NAME in binary_operations.CheckForInstalledBinary(
        BINARY_NAME, install_if_missing=install_if_missing
    )
  except binary_operations.MissingExecutableException:
    # Failed because user has access to gcloud components but opted not to
    # install or the user doesn't have access to the gcloud components manager.
    # This property will prevent automatic installation in the future, but it
    # won't prevent gcloud-crc32c from being used if later installed separately.
    log.debug(
        'gcloud-crc32c can not be found on the path. The user may have opted'
        ' out of installation or lack component manager access.'
    )
    properties.VALUES.storage.use_gcloud_crc32c.Set(False)
  except:  # pylint: disable=bare-except
    # Other errors that happen during installation checks aren't fatal.
    log.debug('Failed to check if gcloud-crc32c is available.')
  return False


def check_if_will_use_fast_crc32c(install_if_missing=False):
  return crc32c.IS_FAST_GOOGLE_CRC32C_AVAILABLE or (
      # pylint:disable=g-bool-id-comparison
      properties.VALUES.storage.use_gcloud_crc32c.GetBool() is not False
      # pylint:enable=g-bool-id-comparison
      and _check_if_gcloud_crc32c_available(install_if_missing)
  )


def should_use_gcloud_crc32c(install_if_missing=False):
  """Returns True if gcloud-crc32c should be used and installs if needed.

  Args:
    install_if_missing (bool): Install gcloud-crc32c if not already present.

  Returns:
    True if the Go binary gcloud-crc32c should be used.
  """
  user_wants_gcloud_crc32c = (
      properties.VALUES.storage.use_gcloud_crc32c.GetBool())
  # pylint:disable=g-bool-id-comparison
  if user_wants_gcloud_crc32c is False:
    # pylint:enable=g-bool-id-comparison
    return False
  if (user_wants_gcloud_crc32c is None and
      crc32c.IS_FAST_GOOGLE_CRC32C_AVAILABLE):
    # User has no preference, and we can rely on google-crc32c module.
    return False
  if install_if_missing:
    return _check_if_gcloud_crc32c_available(install_if_missing=True)
  return _is_gcloud_crc32c_installed()


def get_crc32c(initial_data=b''):
  """Wraps the crc32c.get_crc32c() method to allow fallback to gcloud-crc32c.

  DO NOT USE for streaming downloads, as this relies on file-based hashing and
  does not take whether or not streaming is enabled into account.

  Args:
    initial_data (bytes): The CRC32C object will be initialized with the
      checksum of the data.

  Returns:
    A DeferredCrc32c instance if hashing can be deferred. Otherwise it returns a
    google_crc32c.Checksum instance if google-crc32c
    (https://github.com/googleapis/python-crc32c) is available and a
    predefined.Crc instance from crcmod library if not.
  """
  should_defer = should_use_gcloud_crc32c(install_if_missing=True)
  return DeferredCrc32c() if should_defer else crc32c.get_crc32c(initial_data)


def get_google_crc32c_install_command():
  """Returns the command to install google-crc32c library.

  This will typically only be called if gcloud-crc32c is missing and can't be
  installed for some reason. It requires user intervention which is why it's
  not a preferred option.
  """
  sdk_info = info_holder.InfoHolder()
  sdk_root = sdk_info.installation.sdk_root
  if sdk_root:
    third_party_path = os.path.join(sdk_root, 'lib', 'third_party')
    return '{} -m pip install google-crc32c --upgrade --target {}'.format(
        sdk_info.basic.python_location, third_party_path)
  return None


def _get_hash_check_warning_base():
  """CRC32C warnings share this text."""
  # Create the text in a function so that we can test it easily.
  google_crc32c_install_step = get_google_crc32c_install_command()
  gcloud_crc32c_install_step = 'gcloud components install gcloud-crc32c'
  return textwrap.dedent(
      """\
      This copy {{}} since fast hash calculation tools
      are not installed. You can change this by running:
      \t$ {crc32c_step}
      You can also modify the "storage/check_hashes" config setting.""".format(
          crc32c_step=google_crc32c_install_step
          if google_crc32c_install_step
          else gcloud_crc32c_install_step
      )
  )


_HASH_CHECK_WARNING_BASE = _get_hash_check_warning_base()
_NO_HASH_CHECK_WARNING = _HASH_CHECK_WARNING_BASE.format(
    'will not be validated'
)
_SLOW_HASH_CHECK_WARNING = _HASH_CHECK_WARNING_BASE.format('may be slow')
_NO_HASH_CHECK_ERROR = _HASH_CHECK_WARNING_BASE.format('was skipped')


def log_or_raise_crc32c_issues(warn_for_always=True):
  """Informs user about slow hashing if requested.

  Args:
    warn_for_always (bool): User may not want to see a warning about slow hashes
      if they have the "always check hashes" property set because (1) they
      intentionally set a property and (2) it could duplicate a warning in
      FileDownloadTask.

  Raises:
    errors.Error: IF_FAST_ELSE_FAIL set, and CRC32C binary not present. See
      error message for more details.
  """
  if check_if_will_use_fast_crc32c(install_if_missing=True):
    return

  check_hashes = properties.VALUES.storage.check_hashes.Get()
  if check_hashes == properties.CheckHashes.ALWAYS.value and warn_for_always:
    log.warning(_SLOW_HASH_CHECK_WARNING)
  elif check_hashes == properties.CheckHashes.IF_FAST_ELSE_SKIP.value:
    log.warning(_NO_HASH_CHECK_WARNING)
  elif check_hashes == properties.CheckHashes.IF_FAST_ELSE_FAIL.value:
    raise errors.Error(_NO_HASH_CHECK_ERROR)