HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/deploy/manifest_util.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for parsing the cloud deploy resource to yaml definition."""


import collections
import dataclasses
import datetime
import enum
import re
from typing import Any, Callable, Optional

from apitools.base.py import encoding
from dateutil import parser
from googlecloudsdk.api_lib.util import messages as messages_util
from googlecloudsdk.command_lib.deploy import automation_util
from googlecloudsdk.command_lib.deploy import custom_target_type_util
from googlecloudsdk.command_lib.deploy import delivery_pipeline_util
from googlecloudsdk.command_lib.deploy import deploy_policy_util
from googlecloudsdk.command_lib.deploy import exceptions
from googlecloudsdk.command_lib.deploy import target_util
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources
from googlecloudsdk.core.resource import resource_property
import jsonschema

PIPELINE_UPDATE_MASK = '*,labels'
API_VERSION_V1BETA1 = 'deploy.cloud.google.com/v1beta1'
API_VERSION_V1 = 'deploy.cloud.google.com/v1'
# If changing these fields also change them in the UI code.
NAME_FIELD = 'name'
ADVANCE_ROLLOUT_FIELD = 'advanceRollout'
PROMOTE_RELEASE_FIELD = 'promoteRelease'
REPAIR_ROLLOUT_FIELD = 'repairRollout'
WAIT_FIELD = 'wait'
LABELS_FIELD = 'labels'
ANNOTATIONS_FIELD = 'annotations'
SELECTOR_FIELD = 'selector'
RULES_FIELD = 'rules'
ID_FIELD = 'id'
ADVANCE_ROLLOUT_RULE_FIELD = 'advanceRolloutRule'
PROMOTE_RELEASE_RULE_FIELD = 'promoteReleaseRule'
REPAIR_ROLLOUT_RULE_FIELD = 'repairRolloutRule'
TIMED_PROMOTE_RELEASE_RULE_FIELD = 'timedPromoteReleaseRule'
DESTINATION_TARGET_ID_FIELD = 'destinationTargetId'
SOURCE_PHASES_FIELD = 'sourcePhases'
PHASES_FIELD = 'phases'
DESTINATION_PHASE_FIELD = 'destinationPhase'
DISABLE_ROLLBACK_IF_ROLLOUT_PENDING = 'disableRollbackIfRolloutPending'
TARGET_FIELD = 'target'
METADATA_FIELDS = [ANNOTATIONS_FIELD, LABELS_FIELD, NAME_FIELD]
EXCLUDE_FIELDS = [
    'createTime',
    'customTargetTypeId',
    'etag',
    'targetId',
    'uid',
    'updateTime',
] + METADATA_FIELDS
JOBS_FIELD = 'jobs'
RETRY_FIELD = 'retry'
ATTEMPTS_FIELD = 'attempts'
ROLLBACK_FIELD = 'rollback'
REPAIR_PHASES_FIELD = 'repairPhases'
BACKOFF_MODE_FIELD = 'backoffMode'
SCHEDULE_FIELD = 'schedule'
TIME_ZONE_FIELD = 'timeZone'
LABELS_FIELD = 'labels'


@enum.unique
class ResourceKind(enum.Enum):
  TARGET = 'Target'
  DELIVERY_PIPELINE = 'DeliveryPipeline'
  AUTOMATION = 'Automation'
  CUSTOM_TARGET_TYPE = 'CustomTargetType'
  DEPLOY_POLICY = 'DeployPolicy'

  def __str__(self):
    return self.value


@dataclasses.dataclass
class _TransformContext:
  kind: ResourceKind
  name: str
  project: str
  region: str
  manifest: dict[str, Any]
  field: str


def ParseDeployConfig(
    messages: list[Any], manifests: list[dict[str, Any]], region: str
) -> dict[ResourceKind, list[Any]]:
  """Parses the declarative definition of the resources into message.

  Args:
    messages: module containing the definitions of messages for Cloud Deploy.
    manifests: [str], the list of parsed resource yaml definitions.
    region: str, location ID.

  Returns:
    A dictionary of ResourceKind and list of messages.
  Raises:
    exceptions.CloudDeployConfigError, if the declarative definition is
    incorrect.
  """
  project = properties.VALUES.core.project.GetOrFail()

  manifests_with_metadata = collections.defaultdict(list)
  for i, manifest in enumerate(manifests):
    kind = _GetKind(manifest, i)
    name = _GetName(manifest, i)
    manifests_with_metadata[(kind, name)].append(manifest)

  _CheckDuplicateResourceNames(manifests_with_metadata)

  resources_by_kind = collections.defaultdict(list)
  for (kind, name), manifests in manifests_with_metadata.items():
    resources_by_kind[kind].append(
        # Thanks to _CheckDuplicateResourceNames(), we know manifests has
        # exactly one element.
        _ParseManifest(kind, name, manifests[0], project, region, messages)
    )
  return resources_by_kind


def _GetKind(manifest: dict[str, Any], doc_index: int) -> ResourceKind:
  """Parses the kind of a resource."""
  api_version = manifest.get('apiVersion')
  if not api_version:
    raise exceptions.CloudDeployConfigError.for_unnamed_manifest(
        doc_index, 'missing required field "apiVersion"'
    )
  if api_version not in {API_VERSION_V1BETA1, API_VERSION_V1}:
    raise exceptions.CloudDeployConfigError.for_unnamed_manifest(
        doc_index, f'api version "{api_version}" not supported'
    )
  kind = manifest.get('kind')
  if kind is None:
    raise exceptions.CloudDeployConfigError.for_unnamed_manifest(
        doc_index, 'missing required field "kind"'
    )
  # In Python 3.12+ we can just use `if kind not in ResourceKind`
  if kind not in [str(r) for r in ResourceKind]:
    raise exceptions.CloudDeployConfigError.for_unnamed_manifest(
        doc_index, f'kind "{kind}" not supported'
    )
  return ResourceKind(kind)


def _GetName(manifest: dict[str, Any], doc_index: int) -> str:
  """Parses the name of a resource."""
  metadata = manifest.get('metadata')
  if not metadata or not metadata.get(NAME_FIELD):
    raise exceptions.CloudDeployConfigError.for_unnamed_manifest(
        doc_index, 'missing required field "metadata.name"'
    )
  return metadata['name']


def _CheckDuplicateResourceNames(
    manifests_by_name_and_kind: dict[tuple[ResourceKind, str], list[Any]],
) -> None:
  """Checks if there are any duplicate resource names per resource type."""
  dups = collections.defaultdict(list)
  for (kind, name), manifests in manifests_by_name_and_kind.items():
    if len(manifests) > 1:
      dups[kind].append(name)
  errors = []
  for kind, names in dups.items():
    errors.append(f'{kind} has duplicate name(s): {names}')
  if errors:
    raise exceptions.CloudDeployConfigError(errors)


def _RemoveApiVersionAndKind(
    value: dict[str, Any], transform_context: _TransformContext
) -> None:
  """Removes the apiVersion and kind fields from the manifest."""
  del value
  del transform_context.manifest['apiVersion']
  del transform_context.manifest['kind']


def _MetadataYamlToProto(
    metadata: dict[str, Any], transform_context: _TransformContext
) -> None:
  """Moves the fields in metadata to the top level of the manifest."""
  manifest = transform_context.manifest
  manifest[ANNOTATIONS_FIELD] = metadata.get(ANNOTATIONS_FIELD)
  manifest[LABELS_FIELD] = metadata.get(LABELS_FIELD)
  # I think allowing description in metadata was an accident, not intentional...
  # It's supposed to be a top level field. But even some of our own tests do
  # this, so I think we have to assume customers might have too.
  if 'description' in metadata:
    manifest['description'] = metadata['description']
  name = metadata.get(NAME_FIELD)
  resource_ref = _ref_creators[transform_context.kind](
      name, transform_context.project, transform_context.region
  )
  # Name() does _not_ return the resource name. It returns the part of the name
  # after the resource type. For example, for targets, it returns everything
  # after `/targets/`. So this tells us if the user provided an invalid resource
  # ID, which would lead to a confusing error when we try to call the API.
  if '/' in resource_ref.Name():
    raise exceptions.CloudDeployConfigError.for_resource_field(
        transform_context.kind,
        name,
        'metadata.name',
        f'invalid resource ID "{resource_ref.Name()}"',
    )
  manifest[NAME_FIELD] = resource_ref.RelativeName()
  del manifest['metadata']


def _ConvertLabelsToSnakeCase(
    labels: dict[str, str], transform_context: _TransformContext
) -> dict[str, str]:
  """Convert labels from camelCase to snake_case."""
  del transform_context
  # See go/unified-cloud-labels-proposal.
  return {resource_property.ConvertToSnakeCase(k): v for k, v in labels.items()}


def _UpdateOldAutomationSelector(
    selector: dict[str, Any], transform_context: _TransformContext
) -> dict[str, Any]:
  """Converts an old automation selector to the new format."""
  targets = []
  for target in selector:
    targets.append(target[TARGET_FIELD])
  transform_context.manifest['selector'] = {'targets': targets}


def _RenameOldAutomationRules(
    rule: dict[str, Any], transform_context: _TransformContext
) -> dict[str, Any]:
  """Renames the old automation rule fields to the new format."""
  del transform_context
  if PROMOTE_RELEASE_FIELD in rule:
    rule[PROMOTE_RELEASE_RULE_FIELD] = rule[PROMOTE_RELEASE_FIELD]
    del rule[PROMOTE_RELEASE_FIELD]
  if ADVANCE_ROLLOUT_FIELD in rule:
    rule[ADVANCE_ROLLOUT_RULE_FIELD] = rule[ADVANCE_ROLLOUT_FIELD]
    del rule[ADVANCE_ROLLOUT_FIELD]
  if REPAIR_ROLLOUT_FIELD in rule:
    rule[REPAIR_ROLLOUT_RULE_FIELD] = rule[REPAIR_ROLLOUT_FIELD]
    del rule[REPAIR_ROLLOUT_FIELD]
  return rule


def _ConvertAutomationRuleNameFieldToId(
    rule: dict[str, Any], transform_context: _TransformContext
) -> dict[str, Any]:
  """Move the name field to the id field."""
  if rule is not None and NAME_FIELD in rule:
    if ID_FIELD in rule:
      raise exceptions.CloudDeployConfigError.for_resource(
          transform_context.kind,
          transform_context.name,
          'automation rule has both name and id fields',
      )
    rule[ID_FIELD] = rule[NAME_FIELD]
    del rule[NAME_FIELD]
  return rule


def _AddEmptyRepairAutomationRetryMessage(
    repair_phase: dict[str, Any], transform_context: _TransformContext
) -> dict[str, Any]:
  """Add an empty retry field if it's defined in the manifest but set to None."""
  del transform_context
  if RETRY_FIELD in repair_phase and repair_phase[RETRY_FIELD] is None:
    repair_phase[RETRY_FIELD] = {}
  return repair_phase


def _ConvertRepairAutomationBackoffModeEnumValuesToProtoFormat(
    value: str, transform_context
) -> str:
  """Converts the backoffMode values to the proto enum names."""
  del transform_context

  if not value.startswith('BACKOFF_MODE_'):
    return 'BACKOFF_MODE_' + value


def _ConvertAutomationWaitMinToSec(
    wait: str, transform_context: _TransformContext
) -> str:
  """Converts the wait time from (for example) "5m" to "300s"."""
  del transform_context
  if not re.fullmatch(r'\d+m', wait):
    raise exceptions.AutomationWaitFormatError()
  mins = wait[:-1]
  # convert the minute to second
  seconds = int(mins) * 60
  return '%ss' % seconds


def _ConvertPolicyOneTimeWindowToProtoFormat(
    value: dict[str, Any], transform_context: _TransformContext
) -> dict[str, Any]:
  """Converts the one time window to proto format."""
  proto_format = {}
  if value.get('start'):
    _SetDateTimeFields(value['start'], 'start', proto_format, transform_context)
  if value.get('end'):
    _SetDateTimeFields(value['end'], 'end', proto_format, transform_context)
  # Any other (unknown) fields will cause errors in the proto conversion so we
  # don't need to check for them here.
  return proto_format


def _SetDateTimeFields(
    date_str: str,
    field_name: str,
    proto_format: dict[str, str],
    transform_context: _TransformContext,
) -> None:
  """Convert the date string to proto format and set those fields in proto_format."""
  try:
    date_time = parser.isoparse(date_str)
  except ValueError:
    raise exceptions.CloudDeployConfigError.for_resource_field(
        transform_context.kind,
        transform_context.name,
        field_name,
        'invalid date string: "{date_str}". Must be a valid date in ISO 8601'
        ' format (e.g. {field_name}: "2024-12-24 17:00)',
    )
  except TypeError:
    raise exceptions.CloudDeployConfigError.for_resource_field(
        transform_context.kind,
        transform_context.name,
        field_name,
        'invalid date string: {date_str}. Make sure to put quotes around the'
        ' date string (e.g. {field_name}: "2024-09-27 18:30:31.123") so that it'
        ' is interpreted as a string and not a yaml timestamp.',
    )
  if date_time.tzinfo is not None:
    raise exceptions.CloudDeployConfigError.for_resource_field(
        transform_context.kind,
        transform_context.name,
        field_name,
        'invalid date string: {date_str}. Do not include a timezone or timezone'
        ' offset in the date string. Specify the timezone in the timeZone'
        ' field.',
    )
  date_obj = {
      'year': date_time.year,
      'month': date_time.month,
      'day': date_time.day,
  }
  time_obj = {
      'hours': date_time.hour,
      'minutes': date_time.minute,
      'seconds': date_time.second,
      'nanos': date_time.microsecond * 1000,
  }
  date_field = f'{field_name}Date'
  proto_format[date_field] = date_obj
  time_field = f'{field_name}Time'
  proto_format[time_field] = time_obj


def _ConvertPolicyWeeklyWindowTimes(
    value: str, transform_context: _TransformContext
) -> dict[str, str]:
  """Convert the weekly window times to proto format."""
  # First, check if the hour is 24. If so replace with 00, because
  # fromisoformat() doesn't support 24.
  hour_24 = False
  if value.startswith('24:'):
    hour_24 = True
    value = value.replace('24', '00', 1)
  try:
    time_obj = datetime.time.fromisoformat(value)
  except ValueError:
    raise exceptions.CloudDeployConfigError.for_resource_field(
        transform_context.kind,
        transform_context.name,
        transform_context.field,
        f'invalid time string: "{value}"',
    )
  hour_value = time_obj.hour
  if hour_24:
    hour_value = 24
  return {
      'hours': hour_value,
      'minutes': time_obj.minute,
      'seconds': time_obj.second,
      'nanos': time_obj.microsecond * 1000,
  }


def _ReplaceCustomTargetType(
    value: str, transform_context: _TransformContext
) -> str:
  """Converts a custom target type ID or name to a resource name.

  This is handled specially because we allow providing either the ID or name for
  the custom target type referenced. When the ID is provided we need to
  construct the name.

  Args:
    value: the current value of the customTargetType field.
    transform_context: _TransformContext, data about the current parsing
      context.

  Returns:
    The custom target type resource name.
  """
  # If field contains '/' then we assume it's the name instead of the ID. No
  # adjustments required.
  if '/' in value:
    return value

  return custom_target_type_util.CustomTargetTypeReference(
      value, transform_context.project, transform_context.region
  ).RelativeName()


@dataclasses.dataclass
class TransformConfig:
  """Represents a field that needs transformation during parsing.

  Attributes:
    kinds: The ResourceKinds that this transformation applies to.
    fields: A dot separated list of fields that require special handling. These
      are relative to the top level of the manifest and can contain `[]` to
      represent an array field.
    replace: A function that is called when the field is encountered. The return
      value will be used in place of the original value.
    move: A function that is called when the field is encountered. This is used
      for fields that should be moved to a different location in the manifest.
      The function should modify the transform_context.manifest in place.
    schema: An optional JSON schema that is used to validate the field.
  """

  kinds: set[ResourceKind]
  # A dot separated list of fields that require special handling.
  fields: dict[str]
  # One of `replace` or `move` must be set. The first argument is the value of
  # the field specified in `fields`.
  # If `replace` is set, the function should return the new value and the
  # parsing code will handle the substitution.
  # If `move` is set, the function should modify the transform_context.manifest
  # in place and the parsing code will skip the field.
  replace: Optional[Callable[[Any, _TransformContext], Any]] = None
  move: Optional[Callable[[Any, _TransformContext], None]] = None
  schema: Optional[dict[str, Any]] = None


_PARSE_TRANSFORMS = [
    TransformConfig(
        kinds=set(ResourceKind),
        fields=['apiVersion'],
        move=_RemoveApiVersionAndKind,
    ),
    TransformConfig(
        kinds=set(ResourceKind),
        fields=['metadata'],
        move=_MetadataYamlToProto,
        schema={
            'type': 'object',
            'required': ['name'],
            'properties': {
                'name': {'type': 'string'},
                'description': {'type': 'string'},
                'annotations': {
                    'type': 'object',
                    'additionalProperties': {'type': 'string'},
                },
                'labels': {
                    'type': 'object',
                    'additionalProperties': {'type': 'string'},
                },
            },
            'additionalProperties': False,
        },
    ),
    TransformConfig(
        kinds=set(ResourceKind),
        fields=['labels'],
        replace=_ConvertLabelsToSnakeCase,
        schema={'type': 'object', 'additionalProperties': {'type': 'string'}},
    ),
    TransformConfig(
        kinds=[ResourceKind.AUTOMATION],
        fields=['selector[]'],
        move=_UpdateOldAutomationSelector,
        schema={
            'type': 'array',
            'items': {
                'type': 'object',
                'properties': {'target': {'type': 'object'}},
                'required': ['target'],
                'additionalProperties': True,
            },
        },
    ),
    TransformConfig(
        kinds=[ResourceKind.AUTOMATION],
        fields=['rules[]'],
        replace=_RenameOldAutomationRules,
    ),
    TransformConfig(
        kinds=[ResourceKind.AUTOMATION],
        fields=[
            'rules[].repairRolloutRule',
            'rules[].advanceRolloutRule',
            'rules[].promoteReleaseRule',
            'rules[].timedPromoteReleaseRule',
        ],
        replace=_ConvertAutomationRuleNameFieldToId,
    ),
    TransformConfig(
        kinds=[ResourceKind.AUTOMATION],
        fields=['rules[].repairRolloutRule.repairPhases[]'],
        replace=_AddEmptyRepairAutomationRetryMessage,
    ),
    TransformConfig(
        kinds=[ResourceKind.AUTOMATION],
        fields=['rules[].repairRolloutRule.repairPhases[].retry.backoffMode'],
        replace=_ConvertRepairAutomationBackoffModeEnumValuesToProtoFormat,
        schema={'type': 'string'},
    ),
    TransformConfig(
        kinds=[ResourceKind.AUTOMATION],
        fields=[
            'rules[].repairRolloutRule.repairPhases[].retry.wait',
            'rules[].advanceRolloutRule.wait',
            'rules[].promoteReleaseRule.wait',
        ],
        replace=_ConvertAutomationWaitMinToSec,
        schema={'type': 'string'},
    ),
    TransformConfig(
        kinds=[ResourceKind.DEPLOY_POLICY],
        fields=['rules[].rolloutRestriction.timeWindows.oneTimeWindows[]'],
        replace=_ConvertPolicyOneTimeWindowToProtoFormat,
    ),
    TransformConfig(
        kinds=[ResourceKind.DEPLOY_POLICY],
        fields=[
            'rules[].rolloutRestriction.timeWindows.weeklyWindows[].startTime',
            'rules[].rolloutRestriction.timeWindows.weeklyWindows[].endTime',
        ],
        replace=_ConvertPolicyWeeklyWindowTimes,
        schema={'type': 'string'},
    ),
    TransformConfig(
        kinds=[ResourceKind.TARGET],
        fields=['customTarget.customTargetType'],
        replace=_ReplaceCustomTargetType,
        schema={'type': 'string'},
    ),
]


_ref_creators = {
    ResourceKind.AUTOMATION: automation_util.AutomationReference,
    ResourceKind.CUSTOM_TARGET_TYPE: (
        custom_target_type_util.CustomTargetTypeReference
    ),
    ResourceKind.DELIVERY_PIPELINE: (
        delivery_pipeline_util.DeliveryPipelineReference
    ),
    ResourceKind.DEPLOY_POLICY: deploy_policy_util.DeployPolicyReference,
    ResourceKind.TARGET: target_util.TargetReference,
}

_message_types = {
    ResourceKind.AUTOMATION: lambda messages: messages.Automation,
    ResourceKind.CUSTOM_TARGET_TYPE: lambda messages: messages.CustomTargetType,
    ResourceKind.DELIVERY_PIPELINE: lambda messages: messages.DeliveryPipeline,
    ResourceKind.DEPLOY_POLICY: lambda messages: messages.DeployPolicy,
    ResourceKind.TARGET: lambda messages: messages.Target,
}


def _ParseManifest(
    kind: ResourceKind,
    name: str,
    manifest: dict[str, Any],
    project: str,
    region: str,
    messages: list[Any],
) -> Any:
  """Parses a v1beta1/v1 config manifest into a message using proto transforms.

  The parser calls a series of transforms on the manifest dictionary to convert
  it into a form expected by the proto message definitions. This transformed
  dictionary is then passed to `messages_util.DictToMessageWithErrorCheck` to
  convert it into the actual proto message.

  Args:
    kind: str, The kind of the resource (e.g., 'Target').
    name: str, The name of the resource.
    manifest: dict[str, Any], The cloud deploy resource YAML definition as a
      dict.
    project: str, The GCP project ID.
    region: str, The ID of the location.
    messages: The module containing the definitions of messages for Cloud
      Deploy.

  Returns:
    The parsed resource as a message object (e.g., messages.Target), or an
    empty dictionary if the kind is not TARGET_KIND_V1BETA1.

  Raises:
    exceptions.CloudDeployConfigError: If there are errors in the manifest
      because of invalid data.
  """
  ApplyTransforms(manifest, _PARSE_TRANSFORMS, kind, name, project, region)
  try:
    resource = messages_util.DictToMessageWithErrorCheck(
        manifest, _message_types[kind](messages)
    )
  except messages_util.DecodeError as e:
    raise exceptions.CloudDeployConfigError.for_resource(
        kind, name, str(e)
    ) from e
  return resource


def AddApiVersionAndKind(
    value: Any, transform_context: _TransformContext
) -> None:
  """Adds the API version and kind to the manifest."""
  del value
  transform_context.manifest['apiVersion'] = API_VERSION_V1
  transform_context.manifest['kind'] = transform_context.kind.value


def _RemoveField(value: Any, transform_context: _TransformContext) -> None:
  """Removes the field from the manifest."""
  del value
  del transform_context.manifest[transform_context.field]


def _MetadataProtoToYaml(
    value: Any, transform_context: _TransformContext
) -> None:
  """Converts the metadata proto to YAML."""
  del value
  transform_context.manifest['metadata'] = {}
  for k in METADATA_FIELDS:
    if k in transform_context.manifest:
      transform_context.manifest['metadata'][k] = (
          transform_context.manifest.get(k)
      )


def _ConvertAutomationWaitSecToMin(
    wait: str, transform_context: _TransformContext
) -> str:
  del transform_context
  if not wait:
    return wait
  seconds = wait[:-1]
  # convert the minute to second
  mins = int(seconds) // 60
  return '%sm' % mins


def ConvertPolicyOneTimeWindowToYamlFormat(
    one_time_window: dict[str, Any], transform_context: _TransformContext
) -> dict[str, Any]:
  """Exports the oneTimeWindows field of the Deploy Policy resource."""
  one_time = {}
  start_date_time = _DateTimeIsoString(
      one_time_window['startDate'],
      one_time_window['startTime'],
      transform_context,
  )
  end_date_time = _DateTimeIsoString(
      one_time_window['endDate'], one_time_window['endTime'], transform_context
  )
  one_time['start'] = start_date_time
  one_time['end'] = end_date_time
  return one_time


def _DateTimeIsoString(
    date_obj: dict[str, str],
    time_obj: dict[str, str],
    transform_context: _TransformContext,
) -> str:
  """Converts a date and time to a string."""
  date_str = _FormatDate(date_obj)
  time_str = ConvertTimeProtoToString(time_obj, transform_context)
  return f'{date_str} {time_str}'


def _FormatDate(date: dict[str, str]) -> str:
  """Converts a date object to a string."""
  return f"{date['year']:04}-{date['month']:02}-{date['day']:02}"


def ConvertTimeProtoToString(
    time_obj: dict[str, str], transform_context: _TransformContext
) -> str:
  """Converts a time object to a string."""
  del transform_context
  hours = time_obj.get('hours') or 0
  minutes = time_obj.get('minutes') or 0
  time_str = f'{hours:02}:{minutes:02}'
  if time_obj.get('seconds') or time_obj.get('nanos'):
    seconds = time_obj.get('seconds') or 0
    time_str += f':{seconds:02}'
  if time_obj.get('nanos'):
    millis = time_obj.get('nanos') / 1000000
    time_str += f'.{millis:03.0f}'
  return time_str


_EXPORT_TRANSFORMS = [
    TransformConfig(
        kinds=set(ResourceKind),
        # Using the always-present `name` field to make sure this transform is
        # always applied.
        fields=['name'],
        move=AddApiVersionAndKind,
    ),
    TransformConfig(
        kinds=set(ResourceKind) - set([ResourceKind.AUTOMATION]),
        fields=['name'],
        # Strip everything before the last `/` character
        replace=lambda value, transform_context: re.sub(r'.*/', '', value),
    ),
    TransformConfig(
        kinds=[ResourceKind.AUTOMATION],
        fields=['name'],
        replace=lambda value, transform_context: re.sub(
            # Keep only the deliveryPipeline ID and the automation ID:
            #   projects/.../locations/.../deliveryPipelines/foo/automations/bar
            # becomes:
            #   foo/bar
            r'.*/deliveryPipelines/([^/]+)/automations/',
            '\\1/',
            value,
        ),
    ),
    TransformConfig(
        kinds=set(ResourceKind),
        fields=['name'],
        move=_MetadataProtoToYaml,
    ),
    TransformConfig(
        kinds=set(ResourceKind),
        fields=EXCLUDE_FIELDS,
        move=_RemoveField,
    ),
    TransformConfig(
        kinds=[ResourceKind.AUTOMATION],
        fields=[
            'rules[].advanceRolloutRule.wait',
            'rules[].promoteReleaseRule.wait',
            'rules[].repairRolloutRule.repairPhases[].retry.wait',
        ],
        replace=_ConvertAutomationWaitSecToMin,
    ),
    TransformConfig(
        kinds=[ResourceKind.AUTOMATION],
        fields=[
            'rules[].repairRolloutRule.repairPhases[].retry.backoffMode',
        ],
        replace=lambda value, transform_context: value.removeprefix(
            'BACKOFF_MODE_'
        ),
    ),
    TransformConfig(
        kinds=[ResourceKind.AUTOMATION],
        fields=[
            'rules[].repairRolloutRule.repairPhases[].retry.attempts',
        ],
        # apitools converts this to a string, annoyingly. Presumably because
        # it's an int64 field and Python doesn't support ints that big? But the
        # server caps it at 10 anyway.
        replace=lambda value, transform_context: int(value),
    ),
    TransformConfig(
        kinds=[ResourceKind.DEPLOY_POLICY],
        fields=['rules[].rolloutRestriction.timeWindows.oneTimeWindows[]'],
        replace=ConvertPolicyOneTimeWindowToYamlFormat,
    ),
    TransformConfig(
        kinds=[ResourceKind.DEPLOY_POLICY],
        fields=[
            'rules[].rolloutRestriction.timeWindows.weeklyWindows[].startTime',
            'rules[].rolloutRestriction.timeWindows.weeklyWindows[].endTime',
        ],
        replace=ConvertTimeProtoToString,
    ),
]


def ProtoToManifest(
    resource: Any, resource_ref: resources.Resource, kind: ResourceKind
) -> dict[str, Any]:
  """Converts a resource message to a cloud deploy resource manifest.

  The manifest can be applied by 'deploy apply' command.

  Args:
    resource: message in googlecloudsdk.generated_clients.apis.clouddeploy.
    resource_ref: cloud deploy resource object.
    kind: kind of the cloud deploy resource

  Returns:
    A dictionary that represents the cloud deploy resource.
  """
  manifest = encoding.MessageToDict(resource)
  ApplyTransforms(
      manifest,
      _EXPORT_TRANSFORMS,
      kind,
      resource_ref.Name(),
      resource_ref.projectsId,
      resource_ref.locationsId,
  )
  return manifest


def ApplyTransforms(
    manifest: dict[str, Any],
    transforms: list[TransformConfig],
    kind: str,
    name: str,
    project: str,
    region: str,
) -> None:
  """Applies a set of transformations to the manifest."""
  for transform in transforms:
    if kind not in transform.kinds:
      continue
    for field_name in transform.fields:
      value = _GetValue(field_name, manifest)
      if value:
        transform_context = _TransformContext(
            kind,
            name,
            project,
            region,
            manifest,
            _GetFinalFieldName(field_name),
        )
        if transform.replace:
          new_value = _TransformNestedListData(
              value,
              lambda data, current_field=transform, current_transform_context=transform_context: current_field.replace(
                  data, current_transform_context
              ),
              transform_context,
              transform.schema,
          )
          _SetValue(manifest, field_name, new_value)
        else:
          if transform.schema:
            try:
              jsonschema.validate(schema=transform.schema, instance=value)
            except jsonschema.exceptions.ValidationError as e:
              raise exceptions.CloudDeployConfigError.for_resource_field(
                  kind, name, field_name, e.message
              ) from e
          transform.move(value, transform_context)


def _GetValue(path: str, manifest: dict[str, Any]) -> Any:
  """Gets the value at a dot-separated path in a dictionary.

  If the path contains [], it returns a list of values at the path. If the path
  contains multiple [], it will return nested lists. None values will appear
  where there are missing values.

  Args:
    path: str, The dot-separated path to the value.
    manifest: dict, The dictionary to search.

  Returns:
    The value at the path, or None if it doesn't exist.
  """
  if '[]' in path:
    pre, post = path.split('[]', 1)
    post = post.lstrip('.')
    current_list = _GetValue(pre, manifest)
    if not isinstance(current_list, list):
      return None  # Path before [] did not lead to a list

    results = []
    if not post:  # If there's no path after [], return the list items
      return current_list

    for item in current_list:
      result = _GetValue(post, item)
      results.append(result)
    return results

  keys = path.split('.')
  current = manifest
  for key in keys:
    if isinstance(current, dict) and key in current:
      current = current[key]
    elif isinstance(current, list):
      # Handle cases where a list is encountered but not specified with []
      return [_GetValue(key, item) for item in current]
    else:
      return None
  return current


def _TransformNestedListData(
    data: Any,
    func: Callable[[Any], Any],
    transform_context: _TransformContext = None,
    schema: Optional[Any] = None,
) -> Any:
  """Recursively applies a function to elements in a nested structure (lists or single items).

  Args:
    data: The nested structure to process.
    func: The callable to apply to non-list, non-None elements.
    transform_context: The parse context for the data.
    schema: the schema for the data

  Returns:
    A new nested structure with the function applied.
  """
  if data is None:
    return None
  if isinstance(data, list):
    new_list = []
    for item in data:
      new_list.append(
          _TransformNestedListData(item, func, transform_context, schema)
      )
    return new_list
  else:
    if schema:
      try:
        jsonschema.validate(schema=schema, instance=data)
      except jsonschema.exceptions.ValidationError as e:
        raise exceptions.CloudDeployConfigError.for_resource_field(
            transform_context.kind,
            transform_context.name,
            transform_context.field,
            e.message,
        ) from e
    return func(data)


def _SetValue(manifest: dict[str, Any], path: str, value: Any) -> None:
  """Sets the value at a dot-separated path in a dictionary.

  If value is None, deletes the value at path.
  If path contains [], it updates elements within the list. Since this is a
  companion to _GetValue, it processes nested lists the same way.

  Args:
    manifest: dict, The dictionary to update.
    path: str, The dot-separated path to the value.
    value: Any, The value to set, or None to delete.

  Raises:
    exceptions.ManifestTransformException: a mismatch between the provided path
      and the structure of the manifest or the value being set.
  """
  if '[]' in path:
    pre, post = path.split('[]', 1)
    post = post.lstrip('.')
    current_list = _GetValue(pre, manifest)

    if not isinstance(current_list, list):
      raise exceptions.ManifestTransformException(
          f'Path "{pre}" did not lead to a list in _SetValue for path "{path}"'
      )

    if not post:
      # If post is empty, replace the current list with the new value
      if not isinstance(value, list):
        raise exceptions.ManifestTransformException(
            f'New value must be a list to replace list at "{pre}"'
        )
      # This is tricky. We need to replace the list in the parent.
      keys = pre.split('.')
      parent = manifest
      for key in keys[:-1]:
        parent = parent[key]
      parent[keys[-1]] = value
      return

    if not isinstance(value, list):
      raise exceptions.ManifestTransformException(
          f'New value must be a list when path "{path}" contains []'
      )
    if len(current_list) != len(value):
      raise exceptions.ManifestTransformException(
          f'List length mismatch: len(current_list)={len(current_list)},'
          f' len(value)={len(value)} for path "{path}"'
      )

    for i, item in enumerate(current_list):
      current_value = value[i]
      if current_value is None:
        continue
      _SetValue(item, post, current_value)
    return

  keys = path.split('.')
  current = manifest
  for key in keys[:-1]:
    if key not in current:
      # Path doesn't exist, do nothing.
      return
    if not isinstance(current[key], dict):
      raise exceptions.ManifestTransformException(
          f'Value at key "{key}" is not a dictionary for path "{path}"'
      )
    current = current[key]

  last_key = keys[-1]
  if value is None:
    if last_key in current:
      del current[last_key]
  else:
    current[last_key] = value


def _GetFinalFieldName(path: str) -> str:
  """Returns the final field name from a dot-separated path."""
  keys = path.split('.')
  final_field_name = keys[-1]
  if final_field_name.endswith('[]'):
    final_field_name = final_field_name.removesuffix('[]')
  return final_field_name