HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/api_lib/storage/gcs_grpc/metadata_util.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility functions for normalizing gRPC messages."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import copy
import datetime
import sys

from cloudsdk.google.protobuf import json_format
from googlecloudsdk.api_lib.storage import metadata_util
from googlecloudsdk.api_lib.storage.gcs_json import metadata_util as json_metadata_util
from googlecloudsdk.command_lib.storage import encryption_util
from googlecloudsdk.command_lib.storage import errors
from googlecloudsdk.command_lib.storage import hash_util
from googlecloudsdk.command_lib.storage import storage_url
from googlecloudsdk.command_lib.storage import user_request_args_factory
from googlecloudsdk.command_lib.storage.resources import gcs_resource_reference
from googlecloudsdk.command_lib.util import crc32c


# pylint:disable=g-import-not-at-top
try:
  # TODO(b/277356731) Remove version check after gcloud drops Python <= 3.5.
  if sys.version_info.major == 3 and sys.version_info.minor > 5:
    from google.api_core.gapic_v1 import routing_header
except ImportError:
  pass
# pylint:enable=g-import-not-at-top


GRPC_URL_BUCKET_OFFSET = len('projects/_/buckets/')


def _convert_repeated_message_to_dict(message):
  """Converts a sequence of proto messages to a dict."""
  if not message:
    return
  # TODO(b/262768337) Use message_to_dict translation once it's fixed
  # pylint: disable=protected-access
  return [json_format.MessageToDict(i._pb) for i in message]
  # pylint: enable=protected-access


def _convert_proto_to_datetime(proto_datetime):
  """Converts the proto.datetime_helpers.DatetimeWithNanoseconds to datetime."""
  if not proto_datetime:
    return
  return datetime.datetime.fromtimestamp(
      proto_datetime.timestamp(), proto_datetime.tzinfo)


def _get_value_or_none(value):
  """Returns None if value is falsy, else the value itself.

  Unlike Apitools messages, gRPC messages do not return None for fields that
  are not set. It will instead be set to a falsy value.

  Args:
    value (proto.Message): The proto message.

  Returns:
    None if the value is falsy, else the value itself.
  """
  if value:
    return value
  return None


def copy_object_metadata(
    source_metadata,
    destination_metadata,
    request_config,
    should_deep_copy=False,
):
  """Copies specific metadata from source_metadata to destination_metadata.

  The API manually generates metadata for destination objects most of the time,
  but there are some fields that may not be populated.

  Args:
    source_metadata (gapic_clients.storage_v2.types.storage.Object): Metadata
      from source object.
    destination_metadata (gapic_clients.storage_v2.types.storage.Object):
      Metadata for destination object.
    request_config (request_config_factory.RequestConfig): Holds context info
      about the copy operation.
    should_deep_copy (bool): Copy all metadata, removing fields the backend must
      generate and preserving destination address.

  Returns:
    New destination metadata with data copied from source (messages.Object).
  """
  if should_deep_copy:
    destination_bucket = destination_metadata.bucket
    destination_name = destination_metadata.name
    destination_metadata = copy.deepcopy(source_metadata)
    destination_metadata.bucket = destination_bucket
    destination_metadata.name = destination_name
    # Some fields should be regenerated by the backend to avoid errors.
    destination_metadata.generation = None
    # TODO(b/324352239): Fix if required. There is no ID field in gRPC. Since we
    # need not specify it (we want it autogenerated), we perhaps do not need to
    # worry about it. Leaving this TODO to make sure we reconsider it based on
    # our tests before we wrap up the gRPC task.
    # destination_metadata.id = None
    # pylint:disable=g-explicit-bool-comparison,singleton-comparison
    if request_config.resource_args.preserve_acl == False:
      # pylint:enable=g-explicit-bool-comparison,singleton-comparison
      destination_metadata.acl = []
  else:
    if request_config.resource_args.preserve_acl:
      if not source_metadata.acl:
        raise errors.Error(
            'Attempting to preserve ACLs but found no source ACLs.'
        )
      for source_acl in source_metadata.acl:
        destination_metadata.acl.append(copy.deepcopy(source_acl))
    destination_metadata.cache_control = source_metadata.cache_control
    destination_metadata.content_disposition = (
        source_metadata.content_disposition
    )
    destination_metadata.content_encoding = source_metadata.content_encoding
    destination_metadata.content_language = source_metadata.content_language
    destination_metadata.content_type = source_metadata.content_type
    if source_metadata.checksums:
      destination_metadata.checksums.crc32c = source_metadata.checksums.crc32c
      destination_metadata.checksums.md5_hash = (
          source_metadata.checksums.md5_hash
      )

    destination_metadata.custom_time = source_metadata.custom_time
    destination_metadata.metadata = copy.deepcopy(source_metadata.metadata)

  return destination_metadata


def get_grpc_metadata_from_url(cloud_url, grpc_types):
  """Takes storage_url.CloudUrl and returns appropriate Types message."""
  if cloud_url.is_bucket():
    return grpc_types.Bucket(name=cloud_url.bucket_name)
  elif cloud_url.is_object():
    generation = int(cloud_url.generation) if cloud_url.generation else None
    return grpc_types.Object(
        name=cloud_url.resource_name,
        bucket=cloud_url.bucket_name,
        generation=generation,
    )


def get_bucket_resource_from_metadata(metadata):
  """Helper method to generate a BucketResource instance from GCS metadata.

  Args:
    metadata (messages.Bucket): Extract resource properties from this.

  Returns:
    BucketResource with properties populated by metadata.
  """
  # TODO(b/324352239): This is a temporary implementation to unblock
  # direct connectivity diagnostic for Rapid. Ideally the data mapping should be
  # done in a generic way using a helper utility.
  bucket_name = metadata.name[GRPC_URL_BUCKET_OFFSET:]
  url = storage_url.CloudUrl(storage_url.ProviderPrefix.GCS, bucket_name)

  autoclass_enabled_time = _convert_proto_to_datetime(
      metadata.autoclass.toggle_time
  )
  autoclass = {
      'enabled': metadata.autoclass.enabled,
      'toggleTime': autoclass_enabled_time,
      'terminalStorageClass': metadata.autoclass.terminal_storage_class,
      'terminalStorageClassUpdateTime': _convert_proto_to_datetime(
          metadata.autoclass.terminal_storage_class_update_time
      ),
  }
  default_kms_key = getattr(metadata.encryption, 'default_kms_key', None)
  ip_filter = {
      'mode': metadata.ip_filter.mode,
      'publicNetworkSource': {
          'allowedIpCidrRanges': (
              metadata.ip_filter.public_network_source.allowed_ip_cidr_ranges
          ),
      },
      'vpcNetworkSources': _convert_repeated_message_to_dict(
          metadata.ip_filter.vpc_network_sources
      ),
      'allowCrossOrgVpcs': metadata.ip_filter.allow_cross_org_vpcs,
  }
  lifecycle_config = {
      'rule': _convert_repeated_message_to_dict(metadata.lifecycle.rule),
  }
  retention_policy = {
      'effectiveTime': _convert_proto_to_datetime(
          metadata.retention_policy.effective_time
      ),
      'retentionDuration': (
          metadata.retention_policy.retention_duration.total_seconds()
      ),
      'isLocked': metadata.retention_policy.is_locked,
  }
  soft_delete_policy = {
      'effectiveTime': _convert_proto_to_datetime(
          metadata.soft_delete_policy.effective_time
      ),
      'retentionDuration': (
          metadata.soft_delete_policy.retention_duration.total_seconds()
      ),
  }
  website_config = {
      'mainPageSuffix': metadata.website.main_page_suffix,
      'notFoundPage': metadata.website.not_found_page,
  }

  return gcs_resource_reference.GcsBucketResource(
      url,
      autoclass=autoclass,
      acl=_convert_repeated_message_to_dict(metadata.acl),
      autoclass_enabled_time=autoclass_enabled_time,
      cors_config=_convert_repeated_message_to_dict(metadata.cors),
      creation_time=_convert_proto_to_datetime(metadata.create_time),
      custom_placement_config={
          'dataLocations': metadata.custom_placement_config.data_locations
      },
      default_acl=_convert_repeated_message_to_dict(
          metadata.default_object_acl
      ),
      default_event_based_hold=metadata.default_event_based_hold,
      default_kms_key=default_kms_key,
      default_storage_class=metadata.storage_class,
      etag=metadata.etag,
      ip_filter_config=ip_filter,
      labels=metadata.labels,
      lifecycle_config=lifecycle_config,
      location=metadata.location,
      location_type=metadata.location_type,
      logging_config={
          'logBucket': metadata.logging.log_bucket,
          'logObjectPrefix': metadata.logging.log_object_prefix,
      },
      metadata=metadata,
      metageneration=metadata.metageneration,
      per_object_retention={
          'mode': 'Enabled' if metadata.object_retention.enabled else None,
      },
      public_access_prevention=metadata.iam_config.public_access_prevention,
      requester_pays=metadata.billing.requester_pays,
      retention_policy=retention_policy,
      rpo=metadata.rpo,
      satisfies_pzs=metadata.satisfies_pzs,
      soft_delete_policy=soft_delete_policy,
      uniform_bucket_level_access=metadata.iam_config.uniform_bucket_level_access.enabled,
      update_time=_convert_proto_to_datetime(metadata.update_time),
      versioning_enabled=metadata.versioning.enabled,
      website_config=website_config,
  )


def get_object_resource_from_grpc_object(grpc_object):
  """Returns the GCSObjectResource based off of the gRPC Object."""
  if grpc_object.generation is not None:
    # Generation may be 0 integer, which is valid although falsy.
    generation = str(grpc_object.generation)
  else:
    generation = None
  url = storage_url.CloudUrl(
      scheme=storage_url.ProviderPrefix.GCS,
      # bucket is of the form projects/_/buckets/<bucket_name>
      bucket_name=grpc_object.bucket[GRPC_URL_BUCKET_OFFSET:],
      resource_name=grpc_object.name,
      generation=generation,
  )

  if (
      grpc_object.customer_encryption
      and grpc_object.customer_encryption.key_sha256_bytes
  ):
    decryption_key_hash_sha256 = hash_util.get_base64_string(
        grpc_object.customer_encryption.key_sha256_bytes
    )
    encryption_algorithm = grpc_object.customer_encryption.encryption_algorithm
  else:
    decryption_key_hash_sha256 = encryption_algorithm = None

  if grpc_object.checksums.crc32c is not None:
    # crc32c can be 0, so check for None value specifically.
    crc32c_hash = crc32c.get_crc32c_hash_string_from_checksum(
        grpc_object.checksums.crc32c
    )
  else:
    crc32c_hash = None

  if grpc_object.checksums.md5_hash:
    md5_hash = hash_util.get_base64_string(grpc_object.checksums.md5_hash)
  else:
    md5_hash = None

  return gcs_resource_reference.GcsObjectResource(
      url,
      acl=_convert_repeated_message_to_dict(grpc_object.acl),
      cache_control=_get_value_or_none(grpc_object.cache_control),
      component_count=_get_value_or_none(grpc_object.component_count),
      content_disposition=_get_value_or_none(grpc_object.content_disposition),
      content_encoding=_get_value_or_none(grpc_object.content_encoding),
      content_language=_get_value_or_none(grpc_object.content_language),
      content_type=_get_value_or_none(grpc_object.content_type),
      crc32c_hash=crc32c_hash,
      creation_time=_convert_proto_to_datetime(grpc_object.create_time),
      custom_fields=_get_value_or_none(grpc_object.metadata),
      custom_time=_convert_proto_to_datetime(grpc_object.custom_time),
      decryption_key_hash_sha256=decryption_key_hash_sha256,
      encryption_algorithm=encryption_algorithm,
      etag=_get_value_or_none(grpc_object.etag),
      event_based_hold=(
          grpc_object.event_based_hold if grpc_object.event_based_hold else None
      ),
      kms_key=_get_value_or_none(grpc_object.kms_key),
      md5_hash=md5_hash,
      metadata=grpc_object,
      metageneration=grpc_object.metageneration,
      noncurrent_time=_convert_proto_to_datetime(grpc_object.delete_time),
      retention_expiration=_convert_proto_to_datetime(
          grpc_object.retention_expire_time
      ),
      size=grpc_object.size,
      storage_class=_get_value_or_none(grpc_object.storage_class),
      storage_class_update_time=_convert_proto_to_datetime(
          grpc_object.update_storage_class_time
      ),
      temporary_hold=(
          grpc_object.temporary_hold if grpc_object.temporary_hold else None
      ),
      update_time=_convert_proto_to_datetime(grpc_object.update_time),
  )


def update_object_metadata_from_request_config(
    object_metadata,
    request_config,
    attributes_resource=None,
    posix_to_set=None,
):
  # pylint: disable=line-too-long
  """Sets GRPC Storage Object fields based on values in request config.

  Checksums such as md5 are not set because they are ignored if they are
  provided.

  Args:
    object_metadata (gapic_clients.storage_v2.types.storage.Object): Existing
      object metadata.
    request_config (request_config_factory._GcsRequestConfig): May contain data
      to add to object_metadata.
    attributes_resource (FileObjectResource|ObjectResource|None): Contains the
      source StorageUrl and source object metadata for daisy chain transfers.
      Can be None if source is pure stream
    posix_to_set (PosixAttributes|None): Set as custom metadata on target.
  """
  # pylint: enable=line-too-long
  resource_args = request_config.resource_args

  custom_fields_dict = metadata_util.get_updated_custom_fields(
      object_metadata.metadata,
      request_config,
      attributes_resource,
      known_posix=posix_to_set,
  )

  json_metadata_util.process_value_or_clear_flag(
      object_metadata, 'metadata', custom_fields_dict
  )

  should_gzip_locally = json_metadata_util.get_should_gzip_locally(
      attributes_resource, request_config
  )

  content_encoding = json_metadata_util.get_content_encoding(
      should_gzip_locally, resource_args
  )
  cache_control = json_metadata_util.get_cache_control(
      should_gzip_locally, resource_args
  )

  json_metadata_util.process_value_or_clear_flag(
      object_metadata, 'cache_control', cache_control
  )
  json_metadata_util.process_value_or_clear_flag(
      object_metadata, 'content_encoding', content_encoding
  )

  if not resource_args:
    return

  json_metadata_util.process_value_or_clear_flag(
      object_metadata, 'content_disposition', resource_args.content_disposition
  )
  json_metadata_util.process_value_or_clear_flag(
      object_metadata, 'content_language', resource_args.content_language
  )
  json_metadata_util.process_value_or_clear_flag(
      object_metadata, 'content_type', resource_args.content_type
  )
  json_metadata_util.process_value_or_clear_flag(
      object_metadata, 'custom_time', resource_args.custom_time
  )

  # Encryption handling.
  if resource_args.encryption_key:
    if (
        resource_args.encryption_key == user_request_args_factory.CLEAR
        or resource_args.encryption_key.type is encryption_util.KeyType.CSEK
    ):
      object_metadata.kms_key = None
      # For CSEK, set the encryption in API request headers instead.
      object_metadata.customer_encryption = None
    elif resource_args.encryption_key.type is encryption_util.KeyType.CMEK:
      object_metadata.kms_key = resource_args.encryption_key.key


def get_bucket_name_routing_header(bucket_name):
  """Gets routing header with bucket.

  Args:
    bucket_name (str): Name of the bucket.

  Returns:
    (List[Tuple[str, str]]) List with metadata.
  """
  return [routing_header.to_grpc_metadata({'bucket': bucket_name})]