HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/certificate_manager/hooks.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Hooks for Certificate Manager declarative commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import re

from googlecloudsdk.api_lib.certificate_manager import api_client
from googlecloudsdk.core.util import times

DNS_AUTHORIZATIONS_TEMPLATE = "{}/dnsAuthorizations/{}"
ISSUANCE_CONFIG_TEMPLATE = "{}/certificateIssuanceConfigs/{}"
CA_POOL_TEMPLATE = "{}/caPools/{}"


def GetLocation():
  return "global"


def SetAuthorizationURL(ref, args, request):
  """Converts the dns-authorization argument into a relative URL with project name and location.

  Args:
    ref: Reference to the membership object.
    args: Command line arguments.
    request: API request to be issued

  Returns:
    Modified request
  """

  del ref
  if not args:
    return request

  if args.dns_authorizations:
    authorizations = []

    for field in args.dns_authorizations:
      if not field.startswith("projects/"):
        authorizations.append(
            DNS_AUTHORIZATIONS_TEMPLATE.format(request.parent, field))
      else:
        authorizations.append(field)

    request.certificate.managed.dnsAuthorizations = authorizations

  return request


def SetIssuanceConfigURL(ref, args, request):
  """Converts the issuance-config argument into a relative URL with project name and location.

  Args:
    ref: Reference to the membership object.
    args: Command line arguments.
    request: API request to be issued.

  Returns:
    Modified request
  """

  del ref
  if not args:
    return request

  # TODO(b/228342902): Remove once enabled in GA.
  if hasattr(args, "issuance_config"
            ) and args.issuance_config and not args.issuance_config.startswith(
                "projects/"):
    request.certificate.managed.issuanceConfig = ISSUANCE_CONFIG_TEMPLATE.format(
        request.parent, args.issuance_config)

  return request


def SetCAPoolURL(ref, args, request):
  """Converts the ca-pool argument into a relative URL with project name and location.

  Args:
    ref: reference to the membership object.
    args: command line arguments.
    request: API request to be issued

  Returns:
    modified request
  """

  del ref
  if not args:
    return request

  if args.ca_pool:
    if not args.ca_pool.startswith("projects/"):
      request.certificateIssuanceConfig.certificateAuthorityConfig.certificateAuthorityServiceConfig.caPool = CA_POOL_TEMPLATE.format(
          request.parent, args.ca_pool)

  return request


def ParseIso8601LifetimeFlag(value):
  """Parses the ISO 8601 lifetime argument.

  Args:
    value: An ISO 8601 valid value.

  Returns:
    modified value as expected by the API
  """

  return times.FormatDurationForJson(times.ParseDuration(value))


def UpdateTrustConfigAllowlistedCertificates(ref, args, request):
  """Updates allowlisted certificates based on the used flag.

  Args:
    ref: reference to the membership object.
    args: command line arguments.
    request: API request to be issued

  Returns:
    modified request
  """

  del ref
  if not args:
    return request

  if (
      not args.IsSpecified("add_allowlisted_certificates")
      and not args.IsSpecified("remove_allowlisted_certificates")
      and not args.IsSpecified("clear_allowlisted_certificates")
  ):
    return request

  if request.updateMask.find("allowlistedCertificates") == -1:
    if request.updateMask:
      request.updateMask += ","
    request.updateMask += "allowlistedCertificates"

  client = api_client.GetClientInstance()
  service = client.projects_locations_trustConfigs
  messages = client.MESSAGES_MODULE
  get_trust_config_request = (
      messages.CertificatemanagerProjectsLocationsTrustConfigsGetRequest(
          name=request.name
      )
  )
  request.trustConfig.allowlistedCertificates = service.Get(
      get_trust_config_request
  ).allowlistedCertificates

  if args.IsSpecified("remove_allowlisted_certificates"):
    pem_certificates_to_be_removed = set([
        NormalizePemCertificate(ac["pemCertificate"])
        for ac in args.remove_allowlisted_certificates
        if "pemCertificate" in ac
    ])
    request.trustConfig.allowlistedCertificates = [
        ac
        for ac in request.trustConfig.allowlistedCertificates
        if NormalizePemCertificate(ac.pemCertificate)
        not in pem_certificates_to_be_removed
    ]

  if args.IsSpecified("clear_allowlisted_certificates"):
    request.trustConfig.allowlistedCertificates = []

  if args.IsSpecified("add_allowlisted_certificates"):
    request.trustConfig.allowlistedCertificates = (
        request.trustConfig.allowlistedCertificates
        + args.add_allowlisted_certificates
    )

  return request


def NormalizePemCertificate(pem_certificate):
  """Normalizes the PEM certificate for the comparison by removing all whitespace characters.

  Args:
    pem_certificate: PEM certificate to be normalized.

  Returns:
    PEM certificate without whitespace characters.
  """
  return re.sub(r"\s+", "", pem_certificate, flags=re.ASCII)