HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/storage/gcs_json/metadata_util.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tools for making the most of GcsApi metadata."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import copy
import datetime
import enum
from typing import List

from apitools.base.py import encoding
from apitools.base.py import encoding_helper
from googlecloudsdk.api_lib.storage import metadata_util
from googlecloudsdk.api_lib.storage import request_config_factory
from googlecloudsdk.api_lib.storage.gcs_json import metadata_field_converters
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.command_lib.storage import encryption_util
from googlecloudsdk.command_lib.storage import errors
from googlecloudsdk.command_lib.storage import gzip_util
from googlecloudsdk.command_lib.storage import storage_url
from googlecloudsdk.command_lib.storage import user_request_args_factory
from googlecloudsdk.command_lib.storage.resources import gcs_resource_reference
from googlecloudsdk.command_lib.storage.resources import resource_reference
from googlecloudsdk.command_lib.storage.resources import resource_util
from googlecloudsdk.core import properties


# Similar to CORS above, we need a sentinel value allowing us to specify
# when a default object ACL should be private (containing no entries).
# A defaultObjectAcl value of [] means don't modify the default object ACL.
# A value of [PRIVATE_DEFAULT_OBJ_ACL] means create an empty/private default
# object ACL.
PRIVATE_DEFAULT_OBJECT_ACL = apis.GetMessagesModule(
    'storage', 'v1').ObjectAccessControl(id='PRIVATE_DEFAULT_OBJ_ACL')
CONTEXT_TYPE_LITERAL = 'type'
CONTEXT_VALUE_LITERAL = 'value'
CONTEXT_CREATE_TIME_LITERAL = 'createTime'
CONTEXT_UPDATE_TIME_LITERAL = 'updateTime'

_NO_TRANSFORM = 'no-transform'


def _message_to_dict(message):
  """Converts message to dict. Returns None is message is None."""
  if message is not None:
    result = encoding.MessageToDict(message)
    # Explicit comparison is needed because we don't want to return None for
    # False values.
    if result == []:  # pylint: disable=g-explicit-bool-comparison
      return None
    return result
  return None


class ContextType(enum.Enum):
  """Used to distinguish between custom and google generated contexts."""
  CUSTOM = 'CUSTOM'


class MethodType(enum.Enum):
  """Configuration for copying metadata."""
  OBJECT_PATCH = 'object_patch'
  OBJECT_REWRITE = 'object_rewrite'
  OBJECT_COMPOSE = 'object_compose'
  OBJECT_INSERT = 'object_insert'


def get_context_value_dict_from_value(value):
  """Returns the context value dict based on the value.

  Args:
    value (str): The value to be parsed.
  """
  return {CONTEXT_VALUE_LITERAL: value}


def copy_object_metadata(source_metadata,
                         destination_metadata,
                         request_config,
                         should_deep_copy=False,
                         method_type=MethodType.OBJECT_REWRITE):
  """Copies specific metadata from source_metadata to destination_metadata.

  The API manually generates metadata for destination objects most of the time,
  but there are some fields that may not be populated.

  Args:
    source_metadata (messages.Object): Metadata from source object.
    destination_metadata (messages.Object): Metadata for destination object.
    request_config (request_config_factory.RequestConfig): Holds context info
      about the copy operation.
    should_deep_copy (bool): Copy all metadata, removing fields the
      backend must generate and preserving destination address.
    method_type (MethodType): The type of method being used to copy the object,
        defaulting to MethodType.OBJECT_REWRITE.

  Returns:
    New destination metadata with data copied from source (messages.Object).
  """
  if should_deep_copy:
    destination_bucket = destination_metadata.bucket
    destination_name = destination_metadata.name
    destination_metadata = copy.deepcopy(source_metadata)
    destination_metadata.bucket = destination_bucket
    destination_metadata.name = destination_name
    # Some fields should be regenerated by the backend to avoid errors.
    destination_metadata.generation = None
    destination_metadata.id = None
    # pylint:disable=g-explicit-bool-comparison,singleton-comparison
    if request_config.resource_args.preserve_acl == False:
      # pylint:enable=g-explicit-bool-comparison,singleton-comparison
      destination_metadata.acl = []
  else:
    if request_config.resource_args.preserve_acl:
      if not source_metadata.acl:
        raise errors.Error(
            'Attempting to preserve ACLs but found no source ACLs.'
        )
      destination_metadata.acl = copy.deepcopy(source_metadata.acl)
    destination_metadata.cacheControl = source_metadata.cacheControl
    destination_metadata.contentDisposition = source_metadata.contentDisposition
    destination_metadata.contentEncoding = source_metadata.contentEncoding
    destination_metadata.contentLanguage = source_metadata.contentLanguage
    destination_metadata.contentType = source_metadata.contentType
    destination_metadata.crc32c = source_metadata.crc32c
    destination_metadata.customTime = source_metadata.customTime
    destination_metadata.md5Hash = source_metadata.md5Hash
    destination_metadata.metadata = copy.deepcopy(source_metadata.metadata)
    if method_type != MethodType.OBJECT_COMPOSE:
      # Currently this method is being called for preparing request for
      # object.compose, and object.rewrite.
      # The compose method pick up certain attributes from the first object
      # speicified in the compose request, and ignore the rest,
      # but we do not want this in case of object contexts, because instead we
      # want the merged contexts from all the objects present in the compose
      # request.
      destination_metadata.contexts = copy.deepcopy(source_metadata.contexts)

  return destination_metadata


def get_apitools_metadata_from_url(cloud_url):
  """Takes storage_url.CloudUrl and returns appropriate Apitools message."""
  messages = apis.GetMessagesModule('storage', 'v1')
  if cloud_url.is_bucket():
    return messages.Bucket(name=cloud_url.bucket_name)
  elif cloud_url.is_object():
    generation = int(cloud_url.generation) if cloud_url.generation else None
    return messages.Object(
        name=cloud_url.resource_name,
        bucket=cloud_url.bucket_name,
        generation=generation)


def get_bucket_resource_from_metadata(metadata):
  """Helper method to generate a BucketResource instance from GCS metadata.

  Args:
    metadata (messages.Bucket): Extract resource properties from this.

  Returns:
    BucketResource with properties populated by metadata.
  """
  url = storage_url.CloudUrl(
      scheme=storage_url.ProviderPrefix.GCS, bucket_name=metadata.name)

  if metadata.autoclass and metadata.autoclass.enabled:
    autoclass_enabled_time = metadata.autoclass.toggleTime
  else:
    autoclass_enabled_time = None

  uniform_bucket_level_access = getattr(
      getattr(metadata.iamConfiguration, 'uniformBucketLevelAccess', None),
      'enabled', None)

  return gcs_resource_reference.GcsBucketResource(
      url,
      acl=_message_to_dict(metadata.acl),
      autoclass=_message_to_dict(metadata.autoclass),
      autoclass_enabled_time=autoclass_enabled_time,
      cors_config=_message_to_dict(metadata.cors),
      creation_time=metadata.timeCreated,
      custom_placement_config=_message_to_dict(metadata.customPlacementConfig),
      default_acl=_message_to_dict(metadata.defaultObjectAcl),
      default_event_based_hold=metadata.defaultEventBasedHold or None,
      default_kms_key=getattr(metadata.encryption, 'defaultKmsKeyName', None),
      default_storage_class=metadata.storageClass,
      etag=metadata.etag,
      labels=_message_to_dict(metadata.labels),
      ip_filter_config=_message_to_dict(metadata.ipFilter),
      lifecycle_config=_message_to_dict(metadata.lifecycle),
      location=metadata.location,
      location_type=metadata.locationType,
      logging_config=_message_to_dict(metadata.logging),
      metadata=metadata,
      generation=metadata.generation,
      metageneration=metadata.metageneration,
      per_object_retention=_message_to_dict(metadata.objectRetention),
      project_number=metadata.projectNumber,
      public_access_prevention=getattr(
          metadata.iamConfiguration, 'publicAccessPrevention', None
      ),
      requester_pays=getattr(metadata.billing, 'requesterPays', None),
      retention_policy=_message_to_dict(metadata.retentionPolicy),
      rpo=metadata.rpo,
      satisfies_pzs=metadata.satisfiesPZS,
      soft_delete_policy=_message_to_dict(metadata.softDeletePolicy),
      uniform_bucket_level_access=uniform_bucket_level_access,
      update_time=metadata.updated,
      soft_delete_time=metadata.softDeleteTime,
      hard_delete_time=metadata.hardDeleteTime,
      versioning_enabled=getattr(metadata.versioning, 'enabled', None),
      website_config=_message_to_dict(metadata.website),
  )


def get_metadata_from_bucket_resource(resource):
  """Helper method to generate Apitools metadata instance from BucketResource.

  Args:
    resource (BucketResource): Extract metadata properties from this.

  Returns:
    messages.Bucket with properties populated by resource.
  """
  messages = apis.GetMessagesModule('storage', 'v1')
  metadata = messages.Bucket(
      name=resource.name,
      etag=resource.etag,
      location=resource.location,
      storageClass=resource.default_storage_class)

  if resource.retention_period:
    metadata.retentionPolicy = messages.Bucket.RetentionPolicyValue(
        retentionPeriod=resource.retention_period)
  if resource.uniform_bucket_level_access:
    metadata.iamConfiguration = messages.Bucket.IamConfigurationValue(
        uniformBucketLevelAccess=messages.Bucket.IamConfigurationValue
        .UniformBucketLevelAccessValue(
            enabled=resource.uniform_bucket_level_access))

  return metadata


def get_anywhere_cache_resource_from_metadata(metadata):
  url = storage_url.CloudUrl(
      scheme=storage_url.ProviderPrefix.GCS,
      bucket_name=metadata.bucket,
      resource_name=metadata.anywhereCacheId,
  )
  return gcs_resource_reference.GcsAnywhereCacheResource(
      admission_policy=metadata.admissionPolicy,
      anywhere_cache_id=metadata.anywhereCacheId,
      bucket=metadata.bucket,
      create_time=metadata.createTime,
      id_string=metadata.id,
      kind=metadata.kind,
      metadata=metadata,
      pending_update=metadata.pendingUpdate,
      state=metadata.state,
      storage_url=url,
      ttl=metadata.ttl,
      update_time=metadata.updateTime,
      zone=metadata.zone,
  )


def parse_custom_contexts_dict_from_resource_contexts_dict(
    resource_contexts_dict,
):
  """Parses custom contexts from resource contexts for request purposes.

  Examples:
    resource_contexts_dict={
        'key': {
            'value': 'value',
            'createTime': '2025-01-01T00:00:00Z',
            'updateTime': '2025-01-01T00:00:00Z',
            'type': 'CUSTOM'
        }
        'key2': {
            'value': 'value2',
            'createTime': '2025-01-01T00:00:00Z',
            'updateTime': '2025-01-01T00:00:00Z',
            'type': 'GENERATED'
        }
    } would return the following dictionary:
    {
        'custom': {
          'key': {
              'value': 'value',
          }
        }
    }

  Note: the method intentionally drops all other fields other than the context
  value, as they are internal only fields, or not meant to be sent to the API.

  Args:
    resource_contexts_dict (dict): The resource contexts dict to parse.

  Returns:
    a dictionary of contexts in apitools format for request purposes.
  """
  if resource_contexts_dict is None:
    return resource_contexts_dict
  custom_contexts_dict = {}
  for key, value in resource_contexts_dict.items():
    context_type = value.get(CONTEXT_TYPE_LITERAL, None)
    context_value = value.get(CONTEXT_VALUE_LITERAL, None)
    if context_type == ContextType.CUSTOM.value:
      custom_contexts_dict[key] = (
          get_context_value_dict_from_value(context_value)
      )
  return get_contexts_dict_from_custom_contexts_dict(custom_contexts_dict)


def _parse_contexts_from_object_metadata(metadata):
  """Returns parsed contexts from apitools object metadata.

  apitools object metadata separates custom contexts, and google generated
  contexts (coming in 2026) into separate maps, However CLI along with other
  clients uses a single map with a type field to distinguish between the two.
  This method parses the apitools object metadata into the single map format
  used by CLI and other clients.

  Args:
    metadata (messages.Object): The object metadata to parse contexts from.
  """
  if not (metadata.contexts and metadata.contexts.custom):
    return None

  custom_contexts = _message_to_dict(metadata.contexts.custom)
  parsed_contexts = {}
  for key, value in custom_contexts.items():
    context_value_dict = {}
    for field in [CONTEXT_CREATE_TIME_LITERAL, CONTEXT_UPDATE_TIME_LITERAL]:
      if field in value:
        context_value_dict[field] = (
            resource_util.get_formatted_string_from_datetime_object(
                datetime.datetime.fromisoformat(value[field])
            )
        )
    context_value_dict[CONTEXT_VALUE_LITERAL] = value[CONTEXT_VALUE_LITERAL]
    context_value_dict[CONTEXT_TYPE_LITERAL] = ContextType.CUSTOM.value
    parsed_contexts[key] = context_value_dict
  return parsed_contexts


def get_object_resource_from_metadata(metadata):
  """Helper method to generate a ObjectResource instance from GCS metadata.

  Args:
    metadata (messages.Object): Extract resource properties from this.

  Returns:
    ObjectResource with properties populated by metadata.
  """
  if metadata.generation is not None:
    # Generation may be 0 integer, which is valid although falsy.
    generation = str(metadata.generation)
  else:
    generation = None
  url = storage_url.CloudUrl(
      scheme=storage_url.ProviderPrefix.GCS,
      bucket_name=metadata.bucket,
      resource_name=metadata.name,
      generation=generation)

  if metadata.customerEncryption:
    decryption_key_hash_sha256 = metadata.customerEncryption.keySha256
    encryption_algorithm = metadata.customerEncryption.encryptionAlgorithm
  else:
    decryption_key_hash_sha256 = encryption_algorithm = None

  return gcs_resource_reference.GcsObjectResource(
      url,
      acl=_message_to_dict(metadata.acl),
      cache_control=metadata.cacheControl,
      component_count=metadata.componentCount,
      content_disposition=metadata.contentDisposition,
      content_encoding=metadata.contentEncoding,
      content_language=metadata.contentLanguage,
      content_type=metadata.contentType,
      crc32c_hash=metadata.crc32c,
      creation_time=metadata.timeCreated,
      contexts=_parse_contexts_from_object_metadata(metadata),
      custom_fields=_message_to_dict(metadata.metadata),
      custom_time=metadata.customTime,
      decryption_key_hash_sha256=decryption_key_hash_sha256,
      encryption_algorithm=encryption_algorithm,
      etag=metadata.etag,
      event_based_hold=(
          metadata.eventBasedHold if metadata.eventBasedHold else None
      ),
      hard_delete_time=metadata.hardDeleteTime,
      kms_key=metadata.kmsKeyName,
      md5_hash=metadata.md5Hash,
      metadata=metadata,
      metageneration=metadata.metageneration,
      noncurrent_time=metadata.timeDeleted,
      retention_expiration=metadata.retentionExpirationTime,
      retention_settings=_message_to_dict(metadata.retention),
      size=metadata.size,
      soft_delete_time=metadata.softDeleteTime,
      storage_class=metadata.storageClass,
      storage_class_update_time=metadata.timeStorageClassUpdated,
      temporary_hold=metadata.temporaryHold if metadata.temporaryHold else None,
      update_time=metadata.updated,
  )


def _get_matching_grant_identifier_to_remove_for_shim(
    existing_grant, grant_identifiers
):
  """Shim-only support for case-insensitive matching on non-entity metadata.

  Ports the logic here:
  https://github.com/GoogleCloudPlatform/gsutil/blob/0d9d0175b2b10430471c7b744646e56210f991d3/gslib/utils/acl_helper.py#L291

  Args:
    existing_grant (BucketAccessControl|ObjectAccessControl): A grant currently
      in a resource's access control list.
    grant_identifiers (Iterable[str]): User input specifying the grants to
      remove.

  Returns:
    A string matching existing_grant in grant_identifiers if one exists.
      Otherwise, None. Note that this involves preserving the original case of
      the identifier, despite the fact that this function performs a
      case-insensitive comparison.
  """
  # Making this mapping here is inefficient (O(n^2)), but it allows us to
  # compartmentalize shim logic. I/O time is likely our main bottleneck anyway.
  normalized_identifier_to_original = {
      identifier.lower(): identifier for identifier in grant_identifiers
  }

  if existing_grant.entityId:
    normalized_entity_id = existing_grant.entityId.lower()
    if normalized_entity_id in normalized_identifier_to_original:
      return normalized_identifier_to_original[normalized_entity_id]

  if existing_grant.email:
    normalized_email = existing_grant.email.lower()
    if normalized_email in normalized_identifier_to_original:
      return normalized_identifier_to_original[normalized_email]

  if existing_grant.domain:
    normalized_domain = existing_grant.domain.lower()
    if normalized_domain in normalized_identifier_to_original:
      return normalized_identifier_to_original[normalized_domain]

  if existing_grant.projectTeam:
    normalized_identifier = (
        '{}-{}'.format(
            existing_grant.projectTeam.team,
            existing_grant.projectTeam.projectNumber,
        )
    ).lower()
    if normalized_identifier in normalized_identifier_to_original:
      return normalized_identifier_to_original[normalized_identifier]

  if existing_grant.entity:
    normalized_entity = existing_grant.entity.lower()
    if (
        normalized_entity in normalized_identifier_to_original
        and normalized_entity in ['allusers', 'allauthenticatedusers']
    ):
      return normalized_identifier_to_original[normalized_entity]


def _get_list_with_added_and_removed_acl_grants(
    acl_list, resource_args, is_bucket=False, is_default_object_acl=False
):
  """Returns shallow copy of ACL policy object with requested changes.

  Args:
    acl_list (list): Contains Apitools ACL objects for buckets or objects.
    resource_args (request_config_factory._ResourceConfig): Contains desired
      changes for the ACL policy.
    is_bucket (bool): Used to determine if ACL for bucket or object. False
      implies a cloud storage object.
    is_default_object_acl (bool): Used to determine if target is default object
      ACL list.

  Returns:
    list: Shallow copy of acl_list with added and removed grants.
  """
  new_acl_list = []
  if is_default_object_acl:
    acl_identifiers_to_remove = set(
        resource_args.default_object_acl_grants_to_remove or []
    )
    acl_grants_to_add = resource_args.default_object_acl_grants_to_add or []
  else:
    acl_identifiers_to_remove = set(resource_args.acl_grants_to_remove or [])
    acl_grants_to_add = resource_args.acl_grants_to_add or []

  acl_identifiers_to_add = set(grant['entity'] for grant in acl_grants_to_add)

  found_match = {identifier: False for identifier in acl_identifiers_to_remove}
  for existing_grant in acl_list:
    if properties.VALUES.storage.run_by_gsutil_shim.GetBool():
      matched_identifier = _get_matching_grant_identifier_to_remove_for_shim(
          existing_grant, acl_identifiers_to_remove
      )
    elif existing_grant.entity in acl_identifiers_to_remove:
      matched_identifier = existing_grant.entity
    else:
      matched_identifier = None

    if matched_identifier in found_match:  # Grant should be removed.
      found_match[matched_identifier] = True

    # Gsutil's equivalent of this check involves checking more metadata fields:
    # https://github.com/GoogleCloudPlatform/gsutil/blob/0d9d0175b2b10430471c7b744646e56210f991d3/gslib/utils/acl_helper.py#L158
    # The shim handles creating entity strings and the case-insensitivity of
    # comparisons with the "all(Authenticated)Users" groups.
    elif existing_grant.entity not in acl_identifiers_to_add:
      # Grant is not being updated, so we add it as-is to new ACLs.
      new_acl_list.append(existing_grant)

  unmatched_entities = [k for k, v in found_match.items() if not v]
  if unmatched_entities:
    raise errors.Error(
        'ACL entities marked for removal did not match existing grants:'
        ' {}'.format(sorted(unmatched_entities))
    )

  acl_class = metadata_field_converters.get_bucket_or_object_acl_class(
      is_bucket)

  for new_grant in acl_grants_to_add:
    new_acl_list.append(
        acl_class(entity=new_grant.get('entity'), role=new_grant.get('role'))
    )

  return new_acl_list


def _get_labels_object_with_added_and_removed_labels(labels_object,
                                                     resource_args):
  """Returns shallow copy of bucket labels object with requested changes.

  Args:
    labels_object (messages.Bucket.LabelsValue|None): Existing labels.
    resource_args (request_config_factory._BucketConfig): Contains desired
      changes for labels list.

  Returns:
    messages.Bucket.LabelsValue|None: Contains shallow copy of labels list with
      added and removed values or None if there was no original object.
  """
  messages = apis.GetMessagesModule('storage', 'v1')
  if labels_object:
    existing_labels = labels_object.additionalProperties
  else:
    existing_labels = []
  new_labels = []

  labels_to_remove = set(resource_args.labels_to_remove or [])
  for existing_label in existing_labels:
    if existing_label.key in labels_to_remove:
      # The backend deletes labels whose value is None.
      new_labels.append(
          messages.Bucket.LabelsValue.AdditionalProperty(
              key=existing_label.key, value=None))
    else:
      new_labels.append(existing_label)

  labels_to_append = resource_args.labels_to_append or {}
  for key, value in labels_to_append.items():
    new_labels.append(
        messages.Bucket.LabelsValue.AdditionalProperty(key=key, value=value))

  if not (labels_object or new_labels):
    # Don't send extra data to the API if we're not adding or removing anything.
    return None
  # If all label objects have a None value, backend removes the whole property.
  return messages.Bucket.LabelsValue(additionalProperties=new_labels)


def update_bucket_metadata_from_request_config(bucket_metadata, request_config):
  """Sets Apitools Bucket fields based on values in request_config."""
  resource_args = getattr(request_config, 'resource_args', None)
  if not resource_args:
    return

  if (
      resource_args.enable_autoclass is not None
      or resource_args.autoclass_terminal_storage_class is not None
  ):
    bucket_metadata.autoclass = metadata_field_converters.process_autoclass(
        resource_args.enable_autoclass,
        resource_args.autoclass_terminal_storage_class,
    )
  if resource_args.enable_hierarchical_namespace is not None:
    bucket_metadata.hierarchicalNamespace = (
        metadata_field_converters.process_hierarchical_namespace(
            resource_args.enable_hierarchical_namespace
        )
    )
  if resource_args.cors_file_path is not None:
    bucket_metadata.cors = metadata_field_converters.process_cors(
        resource_args.cors_file_path)
  if resource_args.default_encryption_key is not None:
    bucket_metadata.encryption = (
        metadata_field_converters.process_default_encryption_key(
            resource_args.default_encryption_key))
  if resource_args.default_event_based_hold is not None:
    bucket_metadata.defaultEventBasedHold = (
        resource_args.default_event_based_hold)
  if resource_args.default_storage_class is not None:
    bucket_metadata.storageClass = (
        metadata_field_converters.process_default_storage_class(
            resource_args.default_storage_class))
  if (
      resource_args.ip_filter_file_path is not None
  ):
    bucket_metadata.ipFilter = metadata_field_converters.process_ip_filter(
        resource_args.ip_filter_file_path
    )
  if resource_args.lifecycle_file_path is not None:
    bucket_metadata.lifecycle = (
        metadata_field_converters.process_lifecycle(
            resource_args.lifecycle_file_path))
  if resource_args.location is not None:
    bucket_metadata.location = resource_args.location
  if (resource_args.log_bucket is not None or
      resource_args.log_object_prefix is not None):
    bucket_metadata.logging = metadata_field_converters.process_log_config(
        bucket_metadata.name, resource_args.log_bucket,
        resource_args.log_object_prefix)
  if resource_args.placement is not None:
    bucket_metadata.customPlacementConfig = (
        metadata_field_converters.process_placement_config(
            resource_args.placement))
  if (resource_args.public_access_prevention is not None or
      resource_args.uniform_bucket_level_access is not None):
    # Note: The IAM policy (with role grants) is stored separately because it
    # has its own API.
    bucket_metadata.iamConfiguration = (
        metadata_field_converters.process_bucket_iam_configuration(
            bucket_metadata.iamConfiguration,
            resource_args.public_access_prevention,
            resource_args.uniform_bucket_level_access))
  if resource_args.recovery_point_objective is not None:
    bucket_metadata.rpo = resource_args.recovery_point_objective
  if resource_args.requester_pays is not None:
    bucket_metadata.billing = (
        metadata_field_converters.process_requester_pays(
            bucket_metadata.billing, resource_args.requester_pays))
  if resource_args.retention_period is not None:
    bucket_metadata.retentionPolicy = (
        metadata_field_converters.process_retention_period(
            resource_args.retention_period))
  if resource_args.soft_delete_duration is not None:
    bucket_metadata.softDeletePolicy = (
        metadata_field_converters.process_soft_delete_duration(
            resource_args.soft_delete_duration
        )
    )
  if resource_args.versioning is not None:
    bucket_metadata.versioning = (
        metadata_field_converters.process_versioning(
            resource_args.versioning))
  if (resource_args.web_error_page is not None or
      resource_args.web_main_page_suffix is not None):
    bucket_metadata.website = metadata_field_converters.process_website(
        resource_args.web_error_page, resource_args.web_main_page_suffix)

  if resource_args.acl_file_path is not None:
    bucket_metadata.acl = metadata_field_converters.process_acl_file(
        resource_args.acl_file_path, is_bucket=True
    )
  bucket_metadata.acl = (
      _get_list_with_added_and_removed_acl_grants(
          bucket_metadata.acl,
          resource_args,
          is_bucket=True,
          is_default_object_acl=False))

  if resource_args.default_object_acl_file_path is not None:
    bucket_metadata.defaultObjectAcl = (
        metadata_field_converters.process_acl_file(
            resource_args.default_object_acl_file_path, is_bucket=False
        )
    )
  bucket_metadata.defaultObjectAcl = (
      _get_list_with_added_and_removed_acl_grants(
          bucket_metadata.defaultObjectAcl,
          resource_args,
          is_bucket=False,
          is_default_object_acl=True))

  if resource_args.labels_file_path is not None:
    bucket_metadata.labels = metadata_field_converters.process_labels(
        bucket_metadata.labels, resource_args.labels_file_path)
  # Can still add labels after clear.
  bucket_metadata.labels = _get_labels_object_with_added_and_removed_labels(
      bucket_metadata.labels, resource_args)


def get_cleared_bucket_fields(request_config):
  """Gets a list of fields to be included in requests despite null values."""
  cleared_fields = []
  resource_args = getattr(request_config, 'resource_args', None)
  if not resource_args:
    return cleared_fields

  if (
      resource_args.cors_file_path == user_request_args_factory.CLEAR
      or resource_args.cors_file_path
      and not metadata_util.cached_read_yaml_json_file(
          resource_args.cors_file_path
      )
  ):
    # Empty JSON object similar to CLEAR flag.
    cleared_fields.append('cors')

  if resource_args.default_encryption_key == user_request_args_factory.CLEAR:
    cleared_fields.append('encryption')

  if resource_args.default_storage_class == user_request_args_factory.CLEAR:
    cleared_fields.append('storageClass')

  if resource_args.labels_file_path == user_request_args_factory.CLEAR:
    cleared_fields.append('labels')

  if (
      resource_args.lifecycle_file_path == user_request_args_factory.CLEAR
      or resource_args.lifecycle_file_path
      and not metadata_util.cached_read_yaml_json_file(
          resource_args.lifecycle_file_path
      )
  ):
    # Empty JSON object similar to CLEAR flag.
    cleared_fields.append('lifecycle')

  if resource_args.log_bucket == user_request_args_factory.CLEAR:
    cleared_fields.append('logging')
  elif resource_args.log_object_prefix == user_request_args_factory.CLEAR:
    cleared_fields.append('logging.logObjectPrefix')

  if resource_args.public_access_prevention == user_request_args_factory.CLEAR:
    cleared_fields.append('iamConfiguration.publicAccessPrevention')

  if resource_args.retention_period == user_request_args_factory.CLEAR:
    cleared_fields.append('retentionPolicy')

  if (
      resource_args.web_error_page
      == resource_args.web_main_page_suffix
      == user_request_args_factory.CLEAR
  ):
    cleared_fields.append('website')
  elif resource_args.web_error_page == user_request_args_factory.CLEAR:
    cleared_fields.append('website.notFoundPage')
  elif resource_args.web_main_page_suffix == user_request_args_factory.CLEAR:
    cleared_fields.append('website.mainPageSuffix')

  return cleared_fields


def get_cleared_ip_filter_fields(
    ip_filter
) -> List[str]:
  """Returns cleared IP filter fields for the bucket.

  Args:
    ip_filter: IP filter object.

  Returns:
    List of IP filter fields to be cleared.
  """
  cleared_fields = []
  if ip_filter.mode is None:
    cleared_fields.append('ipFilter.mode')
  if ip_filter.publicNetworkSource is None:
    cleared_fields.append('ipFilter.publicNetworkSource')
  elif (
      ip_filter.publicNetworkSource is not None
      and not ip_filter.publicNetworkSource.allowedIpCidrRanges
  ):
    cleared_fields.append('ipFilter.publicNetworkSource.allowedIpCidrRanges')
  if not ip_filter.vpcNetworkSources:
    cleared_fields.append('ipFilter.vpcNetworkSources')
  return cleared_fields


def get_cache_control(should_gzip_locally, resource_args):
  """Returns cache control metadata value.

  If should_gzip_locally is True, append 'no-transform' to cache control value
  with the user's given value.

  Args:
    should_gzip_locally (bool): True if file should be gzip locally.
    resource_args (request_config_factory._ObjectConfig): Holds settings for a
      cloud resource.

  Returns:
    (str|None) Cache control value.
  """
  if isinstance(resource_args, request_config_factory._ObjectConfig):  # pylint: disable=protected-access
    user_cache_control = resource_args.cache_control
  else:
    user_cache_control = None

  if should_gzip_locally:
    return (
        _NO_TRANSFORM
        if user_cache_control is None
        else '{}, {}'.format(user_cache_control, _NO_TRANSFORM)
    )

  return user_cache_control


def get_content_encoding(should_gzip_locally, resource_args):
  """Returns content encoding metadata value.

  If should_gzip_locally is True, return gzip.

  Args:
    should_gzip_locally (bool): True if file should be gzip locally.
    resource_args (request_config_factory._ObjectConfig): Holds settings for a
      cloud resource.

  Returns:
    (str|None) Content encoding value.
  """
  if should_gzip_locally:
    return 'gzip'

  if isinstance(resource_args, request_config_factory._ObjectConfig):  # pylint: disable=protected-access
    return resource_args.content_encoding

  return None


def get_should_gzip_locally(attributes_resource, request_config):
  if isinstance(attributes_resource, resource_reference.FileObjectResource):
    return gzip_util.should_gzip_locally(
        request_config.gzip_settings,
        attributes_resource.storage_url.resource_name,
    )

  return False


def process_value_or_clear_flag(metadata, key, value):
  """Sets appropriate metadata based on value."""
  if value == user_request_args_factory.CLEAR:
    setattr(metadata, key, None)
  elif value is not None:
    setattr(metadata, key, value)


def get_contexts_dict_from_custom_contexts_dict(custom_contexts_dict):
  """Returns contexts dict from custom contexts dict."""
  return {ContextType.CUSTOM.value.lower(): custom_contexts_dict}


def parse_custom_contexts_from_file(file_path):
  """Parses custom contexts from a file, and validate it against the message.

  Args:
    file_path (str): Path to the file containing the custom contexts.

  Returns:
    A dictionary containing the custom contexts parsed from the file.

  Raises:
    errors.Error: If the provided file doesn't exist or is not a valid
      json/yaml file.
  """
  try:
    messages = apis.GetMessagesModule('storage', 'v1')
    parsed_custom_contexts = metadata_util.cached_read_yaml_json_file(
        file_path
    )

    # We extract the value from the dict, because user might specify other
    # fields in the dict which API might not expect, such as 'type' or
    # 'createTime'.
    extracted_custom_contexts = {}
    for key, value in parsed_custom_contexts.items():
      if (
          not isinstance(value, dict)
          or CONTEXT_VALUE_LITERAL not in value
      ):
        raise errors.Error('Invalid format for specified contexts file.')

      extracted_custom_contexts[key] = (
          get_context_value_dict_from_value(
              value[CONTEXT_VALUE_LITERAL]
          )
      )

    # Validate the parsed content respect the API message.
    encoding_helper.DictToMessage(
        extracted_custom_contexts, messages.Object.ContextsValue.CustomValue
    )
    return extracted_custom_contexts
  except (errors.InvalidUrlError, AttributeError):
    raise errors.Error(
        'Error while parsing the specified contexts file, please ensure that'
        ' specified file exists and is valid.',
    )


def get_updated_custom_contexts(
    existing_custom_contexts,
    request_config,
    method_type,
):
  """Returns a dictionary with updated custom contexts based on a merge strategy.

  Args:
    existing_custom_contexts (dict): Existing custom contexts for an object.
    request_config (request_config): May contain custom contexts fields that
      should be modified.
    method_type (MethodType): method_type for which the custom contexts are
      being updated.

  Returns:
    A dictionary containing the updated custom contexts, or None if no updates
    are needed.
  """
  resource_args = request_config.resource_args
  if not resource_args or not (
      resource_args.custom_contexts_to_set
      or resource_args.custom_contexts_to_remove
      or resource_args.custom_contexts_to_update
  ):
    return (
        copy.deepcopy(existing_custom_contexts)
        if method_type != MethodType.OBJECT_PATCH
        else None
    )

  if resource_args.custom_contexts_to_set == user_request_args_factory.CLEAR:
    return {}

  if resource_args.custom_contexts_to_set:
    if isinstance(resource_args.custom_contexts_to_set, str):
      # It points to a file, so parse it and update the contexts.
      parsed_custom_contexts = parse_custom_contexts_from_file(
          resource_args.custom_contexts_to_set
      )
    else:
      parsed_custom_contexts = {
          key: get_context_value_dict_from_value(value)
          for key, value in resource_args.custom_contexts_to_set.items()
      }

    if method_type != MethodType.OBJECT_PATCH:
      # 'override' behavior: Just return the new set of contexts
      return parsed_custom_contexts

    # `patch` behavior: Clear existing contexts before setting new ones
    updated_custom_contexts = {elem: {} for elem in existing_custom_contexts}
    updated_custom_contexts.update(**parsed_custom_contexts)
    return updated_custom_contexts

  updated_custom_contexts = (
      copy.deepcopy(existing_custom_contexts)
      if method_type != MethodType.OBJECT_PATCH
      else {}
  )

  if resource_args.custom_contexts_to_remove:
    for key in resource_args.custom_contexts_to_remove:
      if method_type == MethodType.OBJECT_PATCH:
        # patch behavior: Remove the specified contexts from the existing
        # contexts
        updated_custom_contexts[key] = {}
      else:
        # override behavior: Remove the keys from the updated contexts.
        updated_custom_contexts.pop(key, None)

  if resource_args.custom_contexts_to_update:
    for key, value in resource_args.custom_contexts_to_update.items():
      updated_custom_contexts[key] = (
          get_context_value_dict_from_value(value)
      )

  return updated_custom_contexts


def update_custom_contexts_from_request_config(
    object_metadata,
    request_config,
    method_type=None,
):
  """Updates custom contexts in object_metadata based on request_config.

  Args:
    object_metadata (storage_v1_messages.Object): Existing object metadata.
    request_config (request_config): May contain custom contexts fields that
      should be modified.
    method_type (MethodType): Different API methods have different metadata
      handling. for example, objects.rewrite handles contexts updates in one
      way, while objects.patch handles contexts updates in another way. This
      variable is tune the handling of metadata updates based on the API method.
      If invalid an error will be thrown, if None, the default handling
      would be used (object.patch behavior)

  Raises:
    errors.Error: If the method type is not supported.
  """
  if not object_metadata.contexts or not object_metadata.contexts.custom:
    existing_custom_contexts = {}
  else:
    existing_custom_contexts = encoding_helper.MessageToDict(
        object_metadata.contexts.custom
    )

  if not method_type:
    method_type = MethodType.OBJECT_PATCH
  elif not isinstance(method_type, MethodType):
    raise errors.Error(
        'Unsupported method type for updating custom contexts: {}.'.format(
            method_type
        )
    )

  messages = apis.GetMessagesModule('storage', 'v1')
  custom_contexts_dict = get_updated_custom_contexts(
      existing_custom_contexts, request_config, method_type
  )
  if method_type != MethodType.OBJECT_PATCH and not custom_contexts_dict:
    custom_contexts_dict = None

  if custom_contexts_dict is not None:
    object_metadata.contexts = encoding_helper.DictToMessage(
        get_contexts_dict_from_custom_contexts_dict(custom_contexts_dict),
        messages.Object.ContextsValue,
    )
  else:
    object_metadata.contexts = None


def update_object_metadata_from_request_config(
    object_metadata,
    request_config,
    attributes_resource=None,
    posix_to_set=None,
    method_type=None,
):
  """Sets Apitools Object fields based on values in request_config.

  User custom metadata takes precedence over preserved POSIX data.
  Gzip metadata changes take precedence over user custom metadata.

  Args:
    object_metadata (storage_v1_messages.Object): Existing object metadata.
    request_config (request_config): May contain data to add to object_metadata.
    attributes_resource (Resource|None): If present, used for parsing POSIX and
      symlink data from a resource for the --preserve-posix and/or
      --preserve_symlink flags. This value is ignored unless it is an instance
      of FileObjectResource.
    posix_to_set (PosixAttributes|None): Set as custom metadata on target.
    method_type (MethodType|None): Different API methods have different metadata
      handling, for example, objects.rewrite handles contexts updates in one
      way, while objects.patch handles contexts updates in another way. This
      variable is tune the handling of metadata updates based on the API method.
      If invalid, an error will be thrown. If None, the default handling
      would be used for each metadata field.

  Raises:
    errors.Error: If the method type is not supported.
  """
  resource_args = request_config.resource_args

  # Custom metadata & POSIX handling.
  if not object_metadata.metadata:
    existing_metadata = {}
  else:
    existing_metadata = encoding_helper.MessageToDict(
        object_metadata.metadata)

  custom_fields_dict = metadata_util.get_updated_custom_fields(
      existing_metadata,
      request_config,
      attributes_resource=attributes_resource,
      known_posix=posix_to_set,
  )
  if custom_fields_dict is not None:
    messages = apis.GetMessagesModule('storage', 'v1')
    object_metadata.metadata = encoding_helper.DictToMessage(
        custom_fields_dict, messages.Object.MetadataValue)

  # Custom contexts handling.
  update_custom_contexts_from_request_config(
      object_metadata, request_config, method_type
  )

  should_gzip_locally = get_should_gzip_locally(
      attributes_resource, request_config)

  cache_control = get_cache_control(should_gzip_locally, resource_args)
  process_value_or_clear_flag(object_metadata, 'cacheControl', cache_control)

  content_encoding = get_content_encoding(should_gzip_locally, resource_args)
  process_value_or_clear_flag(object_metadata, 'contentEncoding',
                              content_encoding)

  if not resource_args:
    return

  # Encryption handling.
  if resource_args.encryption_key:
    if (resource_args.encryption_key == user_request_args_factory.CLEAR or
        resource_args.encryption_key.type is encryption_util.KeyType.CSEK):
      object_metadata.kmsKeyName = None
      # For CSEK, set the encryption in API request headers instead.
      object_metadata.customerEncryption = None
    elif resource_args.encryption_key.type is encryption_util.KeyType.CMEK:
      object_metadata.kmsKeyName = resource_args.encryption_key.key

  # General metadata handling.
  process_value_or_clear_flag(
      object_metadata, 'contentDisposition', resource_args.content_disposition
  )
  process_value_or_clear_flag(
      object_metadata, 'contentLanguage', resource_args.content_language
  )
  process_value_or_clear_flag(
      object_metadata, 'customTime', resource_args.custom_time
  )
  process_value_or_clear_flag(
      object_metadata, 'contentType', resource_args.content_type
  )
  process_value_or_clear_flag(
      object_metadata, 'md5Hash', resource_args.md5_hash
  )
  process_value_or_clear_flag(
      object_metadata, 'storageClass', resource_args.storage_class
  )

  if resource_args.acl_file_path is not None:
    object_metadata.acl = metadata_field_converters.process_acl_file(
        resource_args.acl_file_path
    )
  object_metadata.acl = _get_list_with_added_and_removed_acl_grants(
      object_metadata.acl, resource_args, is_bucket=False
  )

  if resource_args.event_based_hold is not None:
    object_metadata.eventBasedHold = resource_args.event_based_hold
  if resource_args.temporary_hold is not None:
    object_metadata.temporaryHold = resource_args.temporary_hold

  object_metadata.retention = (
      metadata_field_converters.process_object_retention(
          object_metadata.retention,
          resource_args.retain_until,
          resource_args.retention_mode,
      )
  )


def get_cleared_object_fields(request_config):
  """Gets a list of fields to be included in requests despite null values."""
  cleared_fields = []
  resource_args = request_config.resource_args
  if not resource_args:
    return cleared_fields

  if resource_args.cache_control == user_request_args_factory.CLEAR:
    cleared_fields.append('cacheControl')

  if resource_args.content_type == user_request_args_factory.CLEAR:
    cleared_fields.append('contentType')

  if resource_args.content_disposition == user_request_args_factory.CLEAR:
    cleared_fields.append('contentDisposition')

  if resource_args.content_encoding == user_request_args_factory.CLEAR:
    cleared_fields.append('contentEncoding')

  if resource_args.content_language == user_request_args_factory.CLEAR:
    cleared_fields.append('contentLanguage')

  if resource_args.custom_time == user_request_args_factory.CLEAR:
    cleared_fields.append('customTime')

  if (
      resource_args.retain_until == user_request_args_factory.CLEAR
      or resource_args.retention_mode == user_request_args_factory.CLEAR
  ):
    cleared_fields.append('retention')

  return cleared_fields


def get_managed_folder_resource_from_metadata(metadata):
  """Returns a ManagedFolderResource from Apitools metadata."""
  url = storage_url.CloudUrl(
      scheme=storage_url.ProviderPrefix.GCS,
      bucket_name=metadata.bucket,
      resource_name=metadata.name,
  )
  return resource_reference.ManagedFolderResource(
      url,
      create_time=metadata.createTime,
      metadata=metadata,
      metageneration=metadata.metageneration,
      update_time=metadata.updateTime,
  )


def get_folder_resource_from_metadata(metadata):
  """Returns a FolderResource from Apitools metadata."""
  url = storage_url.CloudUrl(
      scheme=storage_url.ProviderPrefix.GCS,
      bucket_name=metadata.bucket,
      resource_name=metadata.name,
  )
  return resource_reference.FolderResource(
      url,
      create_time=metadata.createTime,
      metadata=metadata,
      metageneration=metadata.metageneration,
      update_time=metadata.updateTime,
  )