HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/domains/util.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""General utilties for Cloud Domains commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import re

from apitools.base.py import encoding

from googlecloudsdk.api_lib.domains import operations
from googlecloudsdk.api_lib.domains import registrations
from googlecloudsdk.command_lib.domains import flags
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources
from googlecloudsdk.core import yaml
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.util import files

LOCATIONS_COLLECTION = 'domains.projects.locations'
OPERATIONS_COLLECTION = 'domains.projects.locations.operations'
REGISTRATIONS_COLLECTION = 'domains.projects.locations.registrations'
_PROJECT = lambda: properties.VALUES.core.project.Get(required=True)
_MAX_LIST_BATCH_SIZE = 200


def RegistrationsUriFunc(api_version):
  def UriFunc(resource):
    return ParseRegistration(api_version, resource.name).SelfLink()
  return UriFunc


def AssertRegistrationOperational(api_version, registration):
  messages = registrations.GetMessagesModule(api_version)

  if registration.state not in [
      messages.Registration.StateValueValuesEnum.ACTIVE,
      messages.Registration.StateValueValuesEnum.SUSPENDED
  ]:
    raise exceptions.Error(
        'The registration resource must be in state ACTIVE or SUSPENDED, '
        'not \'{}\'.'.format(registration.state))


def ParseMessageFromYamlFile(path, message_type, error_message):
  """Parse a Yaml file.

  Args:
    path: Yaml file path. If path is None returns None.
    message_type: Message type to parse YAML into.
    error_message: Error message to print in case of parsing error.

  Returns:
    parsed message of type message_type.
  """
  if path is None:
    return None
  raw_message = yaml.load_path(path)
  try:
    parsed_message = encoding.PyValueToMessage(message_type, raw_message)
  except Exception as e:
    # This error may be slightly different in Py2 and Py3.
    raise exceptions.Error('{}: {}'.format(error_message, e))

  unknown_fields = []
  for message in encoding.UnrecognizedFieldIter(parsed_message):
    outer_message = ''.join([edge.field + '.' for edge in message[0]])
    unknown_fields += [outer_message + field for field in message[1]]
  unknown_fields.sort()
  if unknown_fields:
    raise exceptions.Error(
        ('{}.\nProblematic fields: \'{}\'').format(error_message,
                                                   ', '.join(unknown_fields)))

  return parsed_message


def NormalizeResourceName(domain):
  """Normalizes domain name in resource name."""
  parts = domain.split('/')
  parts[-1] = NormalizeDomainName(parts[-1])
  return '/'.join(parts)


def NormalizeDomainName(domain):
  """Normalizes domain name (including punycoding)."""
  if not domain:
    raise exceptions.Error('Empty domain name')
  try:
    normalized = domain.encode('idna').decode()  # To Punycode
    normalized = normalized.lower().rstrip('.')
  except UnicodeError as e:
    raise exceptions.Error('Invalid domain name \'{}\': {}.'.format(domain, e))
  return normalized


def PunycodeToUnicode(domain):
  return domain.encode('utf-8').decode('idna')


def ValidateDomainName(domain):
  if not domain:
    return False
  # Replace with some library function for FQDN validation.
  pattern = r'^[a-z0-9-]+(\.[a-z0-9-]+)+\.{0,1}$'
  if not re.match(pattern, domain) or '..' in domain:
    return False
  return True


def ValidateNonEmpty(s):
  return s is not None and bool(s.strip())


def ValidateRegionCode(rc):
  return rc is not None and len(rc) == 2 and rc.isalpha() and rc.isupper()


def ValidateEmail(email):
  if not email:
    return False
  # Replace with some library function for email validation.
  pattern = r'^[^@\s]+@[^@\s]+\.[^@\s]+$'
  return bool(re.match(pattern, email))


def Prompt(prompt_string, message=None):
  """Prompt for user input.

  Args:
    prompt_string: Message to print in the line with prompt.
    message: Optional message to print before prompt.

  Returns:
    User provided value.
  """
  if message:
    log.status.Print(message)
  return console_io.PromptResponse(prompt_string)


def PromptWithValidator(prompt_string,
                        validator,
                        error_message,
                        message=None,
                        default=None):
  """Prompt for user input and validate output.

  Args:
    prompt_string: Message to print in the line with prompt.
    validator: Validation function (str) -> bool.
    error_message: Message to print if provided value is not correct.
    message: Optional message to print before prompt.
    default: Optional default value.

  Returns:
    Valid user provided value or default if not None and user chose it.
  """
  if message:
    log.status.Print(message)
  while True:
    if default is not None:
      answer = console_io.PromptWithDefault(
          message=prompt_string, default=default)
      if not answer:
        return default
    else:
      answer = console_io.PromptResponse(prompt_string)
    if validator(answer):
      return answer
    else:
      log.status.Print(error_message)


def GetRegistry(api_version):
  registry = resources.REGISTRY.Clone()
  registry.RegisterApiByName('domains', api_version)
  return registry


def ParseRegistration(api_version, registration):
  return GetRegistry(api_version).Parse(
      registration,
      params={
          'projectsId': _PROJECT,
          'locationsId': 'global'
      },
      collection=REGISTRATIONS_COLLECTION)


def ParseOperation(api_version, operation):
  return GetRegistry(api_version).Parse(
      operation,
      params={
          'projectsId': _PROJECT,
          'locationsId': 'global'
      },
      collection=OPERATIONS_COLLECTION)


def DomainNamespace(domain):
  # Return everything after the first encountered dot.
  # This is needed to accommodate two-level domains like .co.uk.
  return domain[domain.find('.'):]


def ParseTransferLockState(api_version, transfer_lock_state):
  messages = registrations.GetMessagesModule(api_version)
  if transfer_lock_state is None:
    return None
  return flags.TransferLockEnumMapper(messages).GetEnumForChoice(
      transfer_lock_state)


def PromptForEnum(enum_mapper, enum_type, current_value):
  """Prompts the user for the new enum_type value.

  Args:
    enum_mapper: Instance of the EnumMapper.
    enum_type: A string with enum type name to print.
    current_value: Current value of the enum.

  Returns:
    The new enum choice or None if the enum shouldn't be updated.
  """
  options = list(enum_mapper.choices)
  update = console_io.PromptContinue(
      f'Your current {enum_type} is: {current_value}.',
      'Do you want to change it',
      default=False,
  )
  if not update:
    return None

  current_choice = 0
  for i, enum in enumerate(options):
    if enum == enum_mapper.GetChoiceForEnum(current_value):
      current_choice = i
  index = console_io.PromptChoice(
      options=options,
      default=current_choice,
      message=f'Specify new {enum_type}',
  )
  return options[index]


def PromptForTransferLockState(api_version, transfer_lock):
  """Prompts the user for new transfer lock state."""
  messages = registrations.GetMessagesModule(api_version)
  enum_mapper = flags.TransferLockEnumMapper(messages)
  result = PromptForEnum(enum_mapper, 'Transfer Lock state', transfer_lock)
  if result is None:
    return None
  return ParseTransferLockState(api_version, result)


def ParseRenewalMethod(api_version, renewal_method):
  messages = registrations.GetMessagesModule(api_version)
  if renewal_method is None:
    return None
  return flags.RenewalMethodEnumMapper(messages).GetEnumForChoice(
      renewal_method
  )


def PromptForRenewalMethod(api_version, preferred_renewal_method):
  """Prompts the user for new renewal method."""
  messages = registrations.GetMessagesModule(api_version)
  enum_mapper = flags.RenewalMethodEnumMapper(messages)
  result = PromptForEnum(
      enum_mapper, 'preferred Renewal Method', preferred_renewal_method
  )
  if result is None:
    return None
  return ParseRenewalMethod(api_version, result)


def PromptForAuthCode():
  """Prompts the user to enter the auth code."""
  message = ('Please provide the authorization code from the domain\'s current '
             'registrar to transfer the domain.')

  log.status.Print(message)
  auth_code = console_io.PromptPassword(
      prompt='Authorization code:  ',
      error_message=' Authorization code must not be empty.',
      validation_callable=ValidateNonEmpty)
  return auth_code


def TransformMoneyType(r):
  if r is None:
    return None
  dr = r
  if not isinstance(dr, dict):
    dr = encoding.MessageToDict(r)
  return '{}.{:02d} {}'.format(dr['units'], int(dr.get('nanos', 0) / (10**7)),
                               dr.get('currencyCode', ''))


def _ParseMoney(money):
  """Parses money string as tuple (units, cents, currency)."""
  match = re.match(r'^(\d+|\d+\.\d{2})\s*([A-Z]{3})$', money)
  if match:
    number, s = match.groups()
  else:
    raise ValueError('Value could not be parsed as number + currency code')
  if '.' in number:
    index = number.find('.')
    return int(number[:index]), int(number[index + 1:]), s
  else:
    return int(number), 0, s


def ParseYearlyPrice(api_version, price_string):
  """Parses money string as type Money."""
  if not price_string:
    return None
  try:
    units, cents, currency = _ParseMoney(price_string)
  except ValueError:
    raise exceptions.Error(
        (
            f"Yearly price '{price_string}' is invalid. Please specify the"
            ' amount followed by the currency code.'
        )
    )

  if currency == '$':
    currency = 'USD'

  messages = registrations.GetMessagesModule(api_version)
  return messages.Money(
      units=int(units), nanos=cents * 10**7, currencyCode=currency)


def EqualPrice(x, y):
  if x.nanos is None:
    x.nanos = 0
  if y.nanos is None:
    y.nanos = 0
  return x == y


def PromptForYearlyPriceAck(price):
  """Asks the user to accept the yearly price."""
  ack = console_io.PromptContinue(
      'Yearly price: {}\n'.format(TransformMoneyType(price)),
      prompt_string='Do you agree to pay this yearly price for your domain',
      throw_if_unattended=True,
      cancel_on_no=True,
      default=False)
  if ack:
    return price
  else:
    return None


def ParseRegisterNotices(notices):
  """Parses registration notices.

  Args:
    notices: list of notices (lowercase-strings).

  Returns:
    Pair (public privacy ack: bool, hsts ack: bool).
  """
  if not notices:
    return False, False
  return 'public-contact-data-acknowledgement' in notices, 'hsts-preloaded' in notices


def PromptForHSTSAck(domain):
  ack = console_io.PromptContinue(
      ('{} is a secure namespace. You may purchase {} now but it will '
       'require an SSL certificate for website connection.').format(
           DomainNamespace(domain), domain),
      throw_if_unattended=True,
      cancel_on_no=True,
      default=False)
  return ack


def WaitForOperation(api_version, response, asynchronous):
  """Handles waiting for the operation and printing information about it.

  Args:
    api_version: Cloud Domains API version to call.
    response: Response from the API call
    asynchronous: If true, do not wait for the operation

  Returns:
    The last information about the operation.
  """
  operation_ref = ParseOperation(api_version, response.name)
  if asynchronous:
    log.status.Print('Started \'{}\''.format(operation_ref.Name()))
  else:
    message = 'Waiting for \'{}\' to complete'
    operations_client = operations.Client.FromApiVersion(api_version)
    response = operations_client.WaitForOperation(
        operation_ref, message.format(operation_ref.Name()))
  return response


def ReadFileContents(path):
  """Reads the text contents from the given path.

  Args:
    path: str, The file path to read.

  Raises:
    Error: If the file cannot be read.

  Returns:
    str, The text string read from the file.
  """
  if not path:
    return None
  return files.ReadFileContents(path)


def GetListBatchSize(args):
  """Returns the batch size for listing resources."""
  if args.page_size:
    return args.page_size
  elif args.limit:
    return min(args.limit, _MAX_LIST_BATCH_SIZE)
  else:
    return None