HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/surface/privateca/certificates/create.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Create a certificate."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.api_lib.cloudkms import cryptokeyversions
from googlecloudsdk.api_lib.privateca import base as privateca_base
from googlecloudsdk.api_lib.privateca import certificate_utils
from googlecloudsdk.api_lib.privateca import request_utils
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.calliope.concepts import deps
from googlecloudsdk.command_lib.privateca import flags
from googlecloudsdk.command_lib.privateca import key_generation
from googlecloudsdk.command_lib.privateca import pem_utils
from googlecloudsdk.command_lib.privateca import resource_args
from googlecloudsdk.command_lib.util.args import labels_util
from googlecloudsdk.command_lib.util.concepts import concept_parsers
from googlecloudsdk.command_lib.util.concepts import presentation_specs
from googlecloudsdk.core import log
from googlecloudsdk.core.util import files
import six

_KEY_OUTPUT_HELP = """The path where the generated private key file should be written (in PEM format).

Note: possession of this key file could allow anybody to act as this certificate's
subject. Please make sure that you store this key file in a secure location at all
times, and ensure that only authorized users have access to it."""


def _ReadCsr(csr_file):
  try:
    return files.ReadFileContents(csr_file)
  except (files.Error, OSError, IOError):
    raise exceptions.BadFileException(
        "Could not read provided CSR file '{}'.".format(csr_file)
    )


def _WritePemChain(pem_cert, issuing_chain, cert_file):
  try:
    pem_chain = [pem_cert] + issuing_chain
    files.WriteFileContents(cert_file, pem_utils.PemChainForOutput(pem_chain))
  except (files.Error, OSError, IOError):
    raise exceptions.BadFileException(
        "Could not write certificate to '{}'.".format(cert_file)
    )


@base.ReleaseTracks(base.ReleaseTrack.GA)
@base.DefaultUniverseOnly
class Create(base.CreateCommand):
  r"""Create a new certificate.

  ## EXAMPLES

  To create a certificate using a CSR:

      $ {command} frontend-server-tls \
        --issuer-pool=my-pool --issuer-location=us-west1 \
        --csr=./csr.pem \
        --cert-output-file=./cert.pem \
        --validity=P30D

  To create a certificate using a client-generated key:

      $ {command} frontend-server-tls \
        --issuer-pool=my-pool --issuer-location=us-west1 \
        --generate-key \
        --key-output-file=./key \
        --cert-output-file=./cert.pem \
        --dns-san=www.example.com \
        --use-preset-profile=leaf_server_tls
  """

  @staticmethod
  def Args(parser):
    persistence_group = parser.add_group(
        mutex=True, required=True, help='Certificate persistence options.'
    )
    base.Argument(
        '--cert-output-file',
        help=(
            'The path where the resulting PEM-encoded certificate chain file'
            ' should be written (ordered from leaf to root).'
        ),
        required=False,
    ).AddToParser(persistence_group)
    base.Argument(
        '--validate-only',
        help=(
            'If this flag is set, the certificate resource will not be'
            ' persisted and the returned certificate will not contain the'
            ' pem_certificate field.'
        ),
        action='store_true',
        default=False,
        required=False,
    ).AddToParser(persistence_group)

    flags.AddValidityFlag(parser, 'certificate', 'P30D', '30 days')
    labels_util.AddCreateLabelsFlags(parser)

    cert_generation_group = parser.add_group(
        mutex=True, required=True, help='Certificate generation method.'
    )

    csr_group = cert_generation_group.add_group(
        help='To issue a certificate from a CSR use the following:',
    )
    base.Argument(
        '--csr', help='A PEM-encoded certificate signing request file path.',
        required=True,
    ).AddToParser(csr_group)

    base.Argument(
        '--rdn-sequence-subject',
        help=(
            'If this value is set then the issued certificate will use the '
            'subject found in the CSR preserving the exact RDN sequence.'
        ),
        hidden=True,
        action='store_true',
    ).AddToParser(csr_group)

    non_csr_group = cert_generation_group.add_group(
        help='Alternatively, you may describe the certificate and key to use.'
    )
    key_group = non_csr_group.add_group(
        mutex=True,
        required=True,
        help=(
            'To describe the key that will be used for this certificate, use '
            'one of the following options.'
        ),
    )
    key_generation_group = key_group.add_group(
        help='To generate a new key pair, use the following:'
    )
    base.Argument(
        '--generate-key',
        help=(
            'Use this flag to have a new RSA-2048 private key securely'
            ' generated on your machine.'
        ),
        action='store_const',
        const=True,
        default=False,
        required=True,
    ).AddToParser(key_generation_group)
    base.Argument(
        '--key-output-file', help=_KEY_OUTPUT_HELP, required=True
    ).AddToParser(key_generation_group)
    base.Argument(
        '--ca',
        help=(
            'The name of an existing certificate authority to use for issuing'
            ' the certificate. If omitted, a certificate authority will be will'
            ' be chosen from the CA pool by the service on your behalf.'
        ),
        required=False,
    ).AddToParser(parser)
    subject_group = non_csr_group.add_group(
        help='The subject names for the certificate.', required=True
    )
    flags.AddSubjectFlags(subject_group)
    x509_parameters_group = non_csr_group.add_group(
        mutex=True, help='The x509 configuration used for this certificate.'
    )
    flags.AddInlineX509ParametersFlags(
        x509_parameters_group, is_ca_command=False, default_max_chain_length=0
    )
    flags.AddUsePresetProfilesFlag(x509_parameters_group)
    flags.AddSubjectKeyIdFlag(parser)

    cert_arg = 'CERTIFICATE'
    concept_parsers.ConceptParser(
        [
            presentation_specs.ResourcePresentationSpec(
                cert_arg,
                resource_args.CreateCertResourceSpec(
                    cert_arg, [Create._GenerateCertificateIdFallthrough()]
                ),
                'The name of the certificate to issue. If the certificate ID '
                'is omitted, a random identifier will be generated according '
                'to the following format: {YYYYMMDD}-{3 random alphanumeric '
                'characters}-{3 random alphanumeric characters}. The '
                'certificate ID is not required when the issuing CA pool is in '
                'the DevOps tier.',
                required=True,
            ),
            presentation_specs.ResourcePresentationSpec(
                '--template',
                resource_args.CreateCertificateTemplateResourceSpec(
                    'certificate_template'
                ),
                'The name of a certificate template to use for issuing this '
                'certificate, if desired. A template may overwrite parts of '
                'the certificate request, and the use of certificate templates '
                "may be required and/or regulated by the issuing CA Pool's CA "
                'Manager. The specified template must be in the same location '
                'as the issuing CA Pool.',
                required=False,
                prefixes=True,
            ),
            presentation_specs.ResourcePresentationSpec(
                '--kms-key-version',
                resource_args.CreateKmsKeyVersionResourceSpec(),
                'An existing KMS key version backing this certificate.',
                group=key_group,
            ),
        ],
        command_level_fallthroughs={
            '--template.location': ['CERTIFICATE.issuer-location']
        },
    ).AddToParser(parser)

    # The only time a resource is returned is when args.validate_only is set.
    parser.display_info.AddFormat('yaml(certificateDescription)')

  @classmethod
  def _GenerateCertificateIdFallthrough(cls):
    cls.id_fallthrough_was_used = False

    def FallthroughFn():
      cls.id_fallthrough_was_used = True
      return certificate_utils.GenerateCertId()

    return deps.Fallthrough(
        function=FallthroughFn,
        hint='certificate id will default to an automatically generated id',
        active=False,
        plural=False,
    )

  def _ValidateArgs(self, args):
    """Validates the command-line args."""
    if args.IsSpecified('use_preset_profile') and args.IsSpecified('template'):
      raise exceptions.OneOfArgumentsRequiredException(
          ['--use-preset-profile', '--template'],
          (
              'To create a certificate, please specify either a preset profile '
              'or a certificate template.'
          ),
      )

    resource_args.ValidateResourceIsCompleteIfSpecified(args, 'kms_key_version')

  @classmethod
  def _PrintWarningsForUnpersistedCert(cls, args):
    """Prints warnings if certain command-line args are used for an unpersisted cert."""
    unused_args = []
    if not cls.id_fallthrough_was_used:
      unused_args.append('certificate ID')
    if args.IsSpecified('labels'):
      unused_args.append('labels')

    if unused_args:
      names = ', '.join(unused_args)
      verb = 'was' if len(unused_args) == 1 else 'were'
      log.warning(
          '{names} {verb} specified but will not be used since the '
          'issuing CA pool is in the DevOps tier, which does not expose '
          'certificate lifecycle.'.format(names=names, verb=verb)
      )

  def _GetPublicKey(self, args):
    """Fetches the public key associated with a non-CSR certificate request, as UTF-8 encoded bytes."""
    kms_key_version = args.CONCEPTS.kms_key_version.Parse()
    if args.generate_key:
      private_key, public_key = key_generation.RSAKeyGen(2048)
      key_generation.ExportPrivateKey(args.key_output_file, private_key)
      return public_key
    elif kms_key_version:
      public_key_response = cryptokeyversions.GetPublicKey(kms_key_version)
      # bytes(..) requires an explicit encoding in PY3.
      return (
          bytes(public_key_response.pem)
          if six.PY2
          else bytes(public_key_response.pem, 'utf-8')
      )
    else:
      # This should not happen because of the required arg group, but protects
      # in case of future additions.
      raise exceptions.OneOfArgumentsRequiredException(
          ['--csr', '--generate-key', '--kms-key-version'],
          (
              'To create a certificate, please specify either a CSR, the'
              ' --generate-key flag to create a new key, or the'
              ' --kms-key-version flag to use an existing KMS key.'
          ),
      )

  def _GenerateCertificateConfig(self, request, args):
    public_key = self._GetPublicKey(args)

    config = self.messages.CertificateConfig()
    config.publicKey = self.messages.PublicKey()
    config.publicKey.key = public_key
    config.publicKey.format = self.messages.PublicKey.FormatValueValuesEnum.PEM
    config.subjectConfig = flags.ParseSubjectFlags(args)
    config.x509Config = flags.ParseX509Parameters(args, is_ca_command=False)
    config.subjectKeyId = flags.ParseSubjectKeyId(args, self.messages)
    return config

  def Run(self, args):
    self.client = privateca_base.GetClientInstance(api_version='v1')
    self.messages = privateca_base.GetMessagesModule(api_version='v1')

    self._ValidateArgs(args)

    cert_ref = args.CONCEPTS.certificate.Parse()
    labels = labels_util.ParseCreateArgs(
        args, self.messages.Certificate.LabelsValue
    )

    request = (
        self.messages.PrivatecaProjectsLocationsCaPoolsCertificatesCreateRequest()
    )
    request.certificate = self.messages.Certificate()
    request.certificateId = cert_ref.Name()
    request.certificate.lifetime = flags.ParseValidityFlag(args)
    request.certificate.labels = labels
    request.parent = cert_ref.Parent().RelativeName()
    request.requestId = request_utils.GenerateRequestId()
    request.validateOnly = args.validate_only
    if args.IsSpecified('ca'):
      request.issuingCertificateAuthorityId = args.ca

    template_ref = args.CONCEPTS.template.Parse()
    if template_ref:
      if template_ref.locationsId != cert_ref.locationsId:
        raise exceptions.InvalidArgumentException(
            '--template',
            'The certificate template must be in the same location as the '
            'issuing CA Pool.',
        )
      request.certificate.certificateTemplate = template_ref.RelativeName()

    if args.csr:
      request.certificate.pemCsr = _ReadCsr(args.csr)
      if args.rdn_sequence_subject:
        request.certificate.subjectMode = (
            self.messages.Certificate.SubjectModeValueValuesEnum.RDN_SEQUENCE
        )
    else:
      request.certificate.config = self._GenerateCertificateConfig(
          request, args
      )

    certificate = self.client.projects_locations_caPools_certificates.Create(
        request
    )

    # Validate-only certs don't have a resource name or pem certificate.
    if args.validate_only:
      return certificate

    status_message = 'Created Certificate'

    if certificate.name:
      status_message += ' [{}]'.format(certificate.name)
    else:
      Create._PrintWarningsForUnpersistedCert(args)

    if certificate.pemCertificate:
      status_message += ' and saved it to [{}]'.format(args.cert_output_file)
      _WritePemChain(
          certificate.pemCertificate,
          certificate.pemCertificateChain,
          args.cert_output_file,
      )

    status_message += '.'
    log.status.Print(status_message)