HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/storage/hash_util.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Hashing utilities for storage commands."""

from __future__ import absolute_import
from __future__ import annotations
from __future__ import division
from __future__ import unicode_literals

import base64
import enum

from googlecloudsdk.command_lib.storage import errors
from googlecloudsdk.command_lib.storage import fast_crc32c_util
from googlecloudsdk.core.updater import installers
from googlecloudsdk.core.util import files
from googlecloudsdk.core.util import hashing


class HashAlgorithm(enum.Enum):
  """Algorithms available for hashing data."""

  MD5 = 'md5'
  CRC32C = 'crc32c'


def get_base64_string(hash_bytes):
  """Takes bytes and returns base64-encoded string."""
  return base64.b64encode(hash_bytes).decode(encoding='utf-8')


def get_bytes_from_base64_string(hash_string):
  """Takes base64-encoded string and returns bytes."""
  hash_bytes = hash_string.encode('utf-8')
  return base64.b64decode(hash_bytes)


def get_base64_hash_digest_string(hash_object):
  """Takes hashlib object and returns base64-encoded digest as string."""
  return get_base64_string(hash_object.digest())


def get_hash_object(hash_algorithm: HashAlgorithm):
  """Returns a hash object for the given hash algorithm."""
  if hash_algorithm == HashAlgorithm.MD5:
    return hashing.get_md5()
  if hash_algorithm == HashAlgorithm.CRC32C:
    return fast_crc32c_util.get_crc32c()
  return None


def _get_hash_for_deferred_crc32c(
    path: str,
    hash_object: fast_crc32c_util.DeferredCrc32c,
    start=None,
    stop=None,
) -> int | None:
  """Returns the hash for the given path and deferred crc32c object."""
  if isinstance(hash_object, fast_crc32c_util.DeferredCrc32c):
    offset = 0 if start is None else start
    length = 0 if stop is None else stop - offset
    hash_object.sum_file(path, offset=offset, length=length)
  return hash_object


def get_hash_from_data_chunk_or_file(
    path: str, data: bytes, hash_algorithm: HashAlgorithm, start=None, stop=None
):
  """Returns the hash object for the given data chunk or file.

  For MD5 and FastCRC32C, this function will return a hash object after
  hashing the given data chunk. For DeferredCRC32C, this function will return
  the deferred hash object from the given file path.

  Args:
    path (str): The file path.
    data (bytes): The data chunk to hash.
    hash_algorithm (HashAlgorithm): The algorithm to use for hashing.
    start (int|None): Optional byte index to start hashing from.
    stop (int|None): Optional byte index to stop hashing at.

  Returns:
    A hash object or None if the algorithm is not supported.
  """
  hash_object = get_hash_object(hash_algorithm)
  if hash_object is None:
    return None
  if isinstance(hash_object, fast_crc32c_util.DeferredCrc32c):
    return _get_hash_for_deferred_crc32c(path, hash_object, start, stop)
  hash_object.update(data)
  return hash_object


def get_hash_from_file(path, hash_algorithm, start=None, stop=None):
  """Reads file and returns its hash object.

  core.util.files.Checksum does similar things but is different enough to merit
  this function. The primary differences are that this function:
  -Uses a FIPS-safe MD5 object.
  -Accomodates gcloud_crc32c, which uses a Go binary for hashing.
  -Supports start and end index to set byte range for hashing.

  Args:
    path (str): File to read.
    hash_algorithm (HashAlgorithm): Algorithm to hash file with.
    start (int|None): Byte index to start hashing at.
    stop (int|None): Stop hashing at this byte index.

  Returns:
    Hash object for file.
  """
  hash_object = get_hash_object(hash_algorithm)
  if hash_object is None:
    return None

  if isinstance(hash_object, fast_crc32c_util.DeferredCrc32c):
    return _get_hash_for_deferred_crc32c(path, hash_object, start, stop)

  with files.BinaryFileReader(path) as stream:
    if start:
      stream.seek(start)
    while True:
      if stop and stream.tell() >= stop:
        break

      # Avoids holding all of file in memory at once.
      if stop is None or stream.tell() + installers.WRITE_BUFFER_SIZE < stop:
        bytes_to_read = installers.WRITE_BUFFER_SIZE
      else:
        bytes_to_read = stop - stream.tell()

      data = stream.read(bytes_to_read)
      if not data:
        break

      if isinstance(data, str):
        # read() can return strings or bytes. Hash objects need bytes.
        data = data.encode('utf-8')
      # Compresses each piece of added data.
      hash_object.update(data)

  return hash_object


def validate_object_hashes_match(object_path, source_hash, destination_hash):
  """Confirms hashes match for copied objects.

  Args:
    object_path (str): URL of object being validated.
    source_hash (str): Hash of source object.
    destination_hash (str): Hash of destination object.

  Raises:
    HashMismatchError: Hashes are not equal.
  """
  if source_hash != destination_hash:
    raise errors.HashMismatchError(
        'Source hash {} does not match destination hash {}'
        ' for object {}.'.format(source_hash, destination_hash, object_path))


def update_digesters(digesters, data):
  """Updates every hash object with new data in a dict of digesters."""
  for hash_object in digesters.values():
    hash_object.update(data)


def copy_digesters(digesters):
  """Returns copy of provided digesters since deepcopying doesn't work."""
  result = {}
  for hash_algorithm in digesters:
    result[hash_algorithm] = digesters[hash_algorithm].copy()
  return result


def reset_digesters(digesters):
  """Clears the data from every hash object in a dict of digesters."""
  for hash_algorithm in digesters:
    if hash_algorithm is HashAlgorithm.MD5:
      digesters[hash_algorithm] = hashing.get_md5()
    elif hash_algorithm is HashAlgorithm.CRC32C:
      digesters[hash_algorithm] = fast_crc32c_util.get_crc32c()
    else:
      raise errors.Error(
          'Unknown hash algorithm found in digesters: {}'.format(hash_algorithm)
      )