HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/privateca/resource_args.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helpers for parsing resource arguments."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.api_lib.privateca import base
from googlecloudsdk.api_lib.privateca import locations
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.calliope.concepts import concepts
from googlecloudsdk.calliope.concepts import deps
from googlecloudsdk.calliope.concepts import handlers
from googlecloudsdk.calliope.concepts import util
from googlecloudsdk.command_lib.kms import resource_args as kms_args
from googlecloudsdk.command_lib.privateca import completers as privateca_completers
from googlecloudsdk.command_lib.privateca import exceptions as privateca_exceptions
from googlecloudsdk.command_lib.util.concepts import concept_parsers
from googlecloudsdk.core import properties
import six

# Flag fallthroughs that rely on properties.
LOCATION_PROPERTY_FALLTHROUGH = deps.PropertyFallthrough(
    properties.VALUES.privateca.location
)
PROJECT_PROPERTY_FALLTHROUGH = deps.PropertyFallthrough(
    properties.VALUES.core.project
)


def CertificateTemplateAttributeConfig():
  # TODO(b/186143764): GA Autocompleters
  return concepts.ResourceParameterAttributeConfig(name='certificate template')


def CaPoolAttributeConfig(display_name='pool', fallthroughs=None):
  # TODO(b/186143764): GA Autocompleters
  return concepts.ResourceParameterAttributeConfig(
      name=display_name,
      help_text='The parent CA Pool of the {resource}.',
      fallthroughs=fallthroughs or [],
  )


def CertAttributeConfig(fallthroughs=None):
  # TODO(b/186143764): GA Autocompleters
  # Certificate is always an anchor attribute so help_text is unused.
  return concepts.ResourceParameterAttributeConfig(
      name='certificate', fallthroughs=fallthroughs or []
  )


def CertAuthorityAttributeConfig(
    arg_name='certificate_authority', fallthroughs=None
):
  fallthroughs = fallthroughs or []
  return concepts.ResourceParameterAttributeConfig(
      name=arg_name,
      help_text='The issuing certificate authority of the {resource}.',
      fallthroughs=fallthroughs,
  )


def LocationAttributeConfig(arg_name='location', fallthroughs=None):
  fallthroughs = fallthroughs or [LOCATION_PROPERTY_FALLTHROUGH]
  return concepts.ResourceParameterAttributeConfig(
      name=arg_name,
      help_text='The location of the {resource}.',
      completer=privateca_completers.LocationsCompleter,
      fallthroughs=fallthroughs,
  )


def ProjectAttributeConfig(arg_name='project', fallthroughs=None):
  """DO NOT USE THIS for most flags.

  This config is only useful when you want to provide an explicit project
  fallthrough. For most cases, prefer concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG.

  Args:
    arg_name: Name of the flag used to specify this attribute. Defaults to
      'project'.
    fallthroughs: List of deps.Fallthrough objects to provide project values.

  Returns:
    A concepts.ResourceParameterAttributeConfig for a project.
  """
  return concepts.ResourceParameterAttributeConfig(
      name=arg_name,
      help_text='The project containing the {resource}.',
      fallthroughs=fallthroughs or [],
  )


def CreateKmsKeyVersionResourceSpec():
  """Creates a resource spec for a KMS CryptoKeyVersion.

  Defaults to the location and project of the CA, specified through flags or
  properties.

  Returns:
    A concepts.ResourceSpec for a CryptoKeyVersion.
  """
  return concepts.ResourceSpec(
      'cloudkms.projects.locations.keyRings.cryptoKeys.cryptoKeyVersions',
      resource_name='key version',
      cryptoKeyVersionsId=kms_args.KeyVersionAttributeConfig(kms_prefix=True),
      cryptoKeysId=kms_args.KeyAttributeConfig(kms_prefix=True),
      keyRingsId=kms_args.KeyringAttributeConfig(kms_prefix=True),
      locationsId=LocationAttributeConfig(
          'kms-location',
          [deps.ArgFallthrough('location'), LOCATION_PROPERTY_FALLTHROUGH],
      ),
      projectsId=ProjectAttributeConfig(
          'kms-project',
          [deps.ArgFallthrough('project'), PROJECT_PROPERTY_FALLTHROUGH],
      ),
  )


def CreateCertAuthorityResourceSpec(
    display_name,
    certificate_authority_attribute='certificate_authority',
    location_attribute='location',
    location_fallthroughs=None,
    pool_id_fallthroughs=None,
    ca_id_fallthroughs=None,
):
  # TODO(b/186143764): GA Autocompleters
  return concepts.ResourceSpec(
      'privateca.projects.locations.caPools.certificateAuthorities',
      api_version='v1',
      # This will be formatted and used as {resource} in the help text.
      resource_name=display_name,
      certificateAuthoritiesId=CertAuthorityAttributeConfig(
          certificate_authority_attribute, fallthroughs=ca_id_fallthroughs
      ),
      caPoolsId=CaPoolAttributeConfig(fallthroughs=pool_id_fallthroughs),
      locationsId=LocationAttributeConfig(
          location_attribute, fallthroughs=location_fallthroughs
      ),
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      disable_auto_completers=True,
  )


def CreateCaPoolResourceSpec(
    display_name,
    location_attribute='location',
    pool_id_fallthroughs=None,
    location_fallthroughs=None,
):
  # TODO(b/186143764): GA Autocompleters
  return concepts.ResourceSpec(
      'privateca.projects.locations.caPools',
      api_version='v1',
      # This will be formatted and used as {resource} in the help text.
      resource_name=display_name,
      caPoolsId=CaPoolAttributeConfig(fallthroughs=pool_id_fallthroughs),
      locationsId=LocationAttributeConfig(
          location_attribute, fallthroughs=location_fallthroughs
      ),
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      disable_auto_completers=True,
  )


def CreateCertResourceSpec(display_name, id_fallthroughs=None):
  return concepts.ResourceSpec(
      'privateca.projects.locations.caPools.certificates',
      # This will be formatted and used as {resource} in the help text.
      api_version='v1',
      resource_name=display_name,
      certificatesId=CertAttributeConfig(fallthroughs=id_fallthroughs or []),
      caPoolsId=CaPoolAttributeConfig('issuer-pool'),
      locationsId=LocationAttributeConfig('issuer-location'),
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      disable_auto_completers=False,
  )


def CreateCertificateTemplateResourceSpec(display_name):
  # TODO(b/186143764): GA Autocompleters
  return concepts.ResourceSpec(
      'privateca.projects.locations.certificateTemplates',
      api_version='v1',
      # This will be formatted and used as {resource} in the help text.
      resource_name=display_name,
      certificateTemplatesId=CertificateTemplateAttributeConfig(),
      locationsId=LocationAttributeConfig(),
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      disable_auto_completers=True,
  )


def AddCertAuthorityPositionalResourceArg(parser, verb):
  """Add a positional resource argument for a GA Certificate Authority.

  NOTE: Must be used only if it's the only resource arg in the command.

  Args:
    parser: the parser for the command.
    verb: str, the verb to describe the resource, such as 'to update'.
  """
  arg_name = 'CERTIFICATE_AUTHORITY'
  concept_parsers.ConceptParser.ForResource(
      arg_name,
      CreateCertAuthorityResourceSpec(arg_name),
      'The certificate authority {}.'.format(verb),
      required=True,
  ).AddToParser(parser)


def AddCaPoolPositionalResourceArg(parser, verb):
  """Add a positional resource argument for a CA Pool.

  NOTE: Must be used only if it's the only resource arg in the command.

  Args:
    parser: the parser for the command.
    verb: str, the verb to describe the resource, such as 'to update'.
  """
  arg_name = 'CA_POOL'
  concept_parsers.ConceptParser.ForResource(
      arg_name,
      CreateCaPoolResourceSpec(arg_name),
      'The ca pool {}.'.format(verb),
      required=True,
  ).AddToParser(parser)


def AddCertPositionalResourceArg(parser, verb):
  """Add a positional resource argument for a GA Certificate.

  NOTE: Must be used only if it's the only resource arg in the command.

  Args:
    parser: the parser for the command.
    verb: str, the verb to describe the resource, such as 'to update'.
  """
  arg_name = 'CERTIFICATE'
  concept_parsers.ConceptParser.ForResource(
      arg_name,
      CreateCertResourceSpec(arg_name),
      'The certificate {}.'.format(verb),
      required=True,
  ).AddToParser(parser)


def AddCertificateTemplatePositionalResourceArg(parser, verb):
  """Add a positional resource argument for a certificate template.

  NOTE: Must be used only if it's the only resource arg in the command.

  Args:
    parser: the parser for the command.
    verb: str, the verb to describe the resource, such as 'to update'.
  """
  arg_name = 'CERTIFICATE_TEMPLATE'
  concept_parsers.ConceptParser.ForResource(
      arg_name,
      CreateCertificateTemplateResourceSpec(arg_name),
      'The template {}.'.format(verb),
      required=True,
  ).AddToParser(parser)


# Resource validation.


def ValidateResourceLocation(resource_ref, arg_name, version='v1'):
  """Raises an exception if the given resource is in an unsupported location."""
  supported_locations = locations.GetSupportedLocations(version=version)
  if resource_ref.locationsId not in supported_locations:
    raise exceptions.InvalidArgumentException(
        arg_name,
        'Resource is in an unsupported location. Supported locations are: {}.'
        .format(', '.join(sorted(supported_locations))),
    )


def CheckExpectedCAType(expected_type, ca, version='v1'):
  """Raises an exception if the Certificate Authority type is not expected_type.

  Args:
    expected_type: The expected type.
    ca: The ca object to check.
    version: The version of the API to check against.
  """
  ca_type_enum = base.GetMessagesModule(
      api_version=version
  ).CertificateAuthority.TypeValueValuesEnum
  if expected_type == ca_type_enum.SUBORDINATE and ca.type != expected_type:
    raise privateca_exceptions.InvalidCertificateAuthorityTypeError(
        'Cannot perform subordinates command on Root CA. Please use the'
        ' `privateca roots` command group instead.'
    )
  elif expected_type == ca_type_enum.SELF_SIGNED and ca.type != expected_type:
    raise privateca_exceptions.InvalidCertificateAuthorityTypeError(
        'Cannot perform roots command on Subordinate CA. Please use the'
        ' `privateca subordinates` command group instead.'
    )


def ValidateResourceIsCompleteIfSpecified(args, resource_arg_name):
  """Raises a ParseError if the given resource_arg_name is partially specified."""
  if not hasattr(args.CONCEPTS, resource_arg_name):
    return

  concept_info = args.CONCEPTS.ArgNameToConceptInfo(resource_arg_name)
  associated_args = [
      util.NamespaceFormat(arg)
      for arg in concept_info.attribute_to_args_map.values()
  ]

  # If none of the relevant args are specified, we're good.
  if not [arg for arg in associated_args if args.IsSpecified(arg)]:
    return

  try:
    # Re-parse this concept, but treat it as required even if it originally
    # wasn't. This will trigger a meaningful user error if it's underspecified.
    concept_info.ClearCache()
    concept_info.allow_empty = False
    concept_info.Parse(args)
  except concepts.InitializationError as e:
    raise handlers.ParseError(resource_arg_name, six.text_type(e))