HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/domains/dns_util.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""DNS utilties for Cloud Domains commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import enum
import sys

from apitools.base.py import exceptions as apitools_exceptions

from googlecloudsdk.api_lib.dns import util as dns_api_util
from googlecloudsdk.api_lib.domains import registrations
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.calliope import exceptions as calliope_exceptions
from googlecloudsdk.command_lib.domains import util
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.resource import resource_printer
import six


@enum.unique
class DNSSECUpdate(enum.Enum):
  """DNSSEC update options."""
  ENABLE = enum.auto()
  DISABLE = enum.auto()
  NO_CHANGE = enum.auto()


class DnsUpdateMask(object):
  """Class with information which parts of dns_settings should be updated."""

  def __init__(self,
               name_servers=False,
               glue_records=False,
               google_domains_dnssec=False,
               custom_dnssec=False):
    self.name_servers = name_servers
    self.glue_records = glue_records
    self.google_domains_dnssec = google_domains_dnssec
    self.custom_dnssec = custom_dnssec


def ParseDNSSettings(api_version,
                     name_servers,
                     cloud_dns_zone,
                     use_google_domains_dns,
                     dns_settings_from_file,
                     domain,
                     dnssec_update=DNSSECUpdate.NO_CHANGE,
                     dns_settings=None):
  """Parses DNS settings from a flag.

  At most one of the arguments (except domain) should be non-empty.

  Args:
    api_version: Cloud Domains API version to call.
    name_servers: List of name servers
    cloud_dns_zone: Cloud DNS Zone name
    use_google_domains_dns: Information that Google Domains name servers should
      be used.
    dns_settings_from_file: Path to a yaml file with dns_settings.
    domain: Domain name corresponding to the DNS settings.
    dnssec_update: DNSSECUpdate operation.
    dns_settings: Current DNS settings. Used during Configure DNS only.

  Returns:
    A pair: (messages.DnsSettings, DnsUpdateMask) to be updated, or (None, None)
    if all the arguments are empty.
  """
  domains_messages = registrations.GetMessagesModule(api_version)
  if name_servers is not None:
    return _CustomNameServers(domains_messages, name_servers)
  if cloud_dns_zone is not None:
    nameservers, ds_records = _GetCloudDnsDetails(domains_messages,
                                                  cloud_dns_zone, domain,
                                                  dnssec_update, dns_settings)
    return _CustomNameServers(domains_messages, nameservers, ds_records)
  if use_google_domains_dns:
    return _GoogleDomainsNameServers(
        domains_messages, dnssec_update, dns_settings
    )
  if dns_settings_from_file is not None:
    return _ParseDnsSettingsFromFile(domains_messages, dns_settings_from_file)
  if dns_settings is not None and dnssec_update == DNSSECUpdate.DISABLE:
    return _DisableDnssec(domains_messages, dns_settings)
  return None, None


def _CustomNameServers(domains_messages, name_servers, ds_records=None):
  """Validates name servers and returns (dns_settings, update_mask)."""
  if not ds_records:
    ds_records = []
  normalized_name_servers = list(map(util.NormalizeDomainName, name_servers))
  for ns, normalized in zip(name_servers, normalized_name_servers):
    if not util.ValidateDomainName(normalized):
      raise exceptions.Error('Invalid name server: \'{}\'.'.format(ns))
  update_mask = DnsUpdateMask(name_servers=True, custom_dnssec=True)
  dns_settings = domains_messages.DnsSettings(
      customDns=domains_messages.CustomDns(
          nameServers=normalized_name_servers, dsRecords=ds_records))
  return dns_settings, update_mask


def _GoogleDomainsNameServers(
    domains_messages, dnssec_update, dns_settings=None
):
  """Enable Google Domains name servers and returns (dns_settings, update_mask)."""
  update_mask = DnsUpdateMask(name_servers=True, google_domains_dnssec=True)
  ds_state = (
      domains_messages.GoogleDomainsDns.DsStateValueValuesEnum
      .DS_RECORDS_UNPUBLISHED)
  if dnssec_update == DNSSECUpdate.ENABLE:
    ds_state = (
        domains_messages.GoogleDomainsDns.DsStateValueValuesEnum
        .DS_RECORDS_PUBLISHED)
  elif dnssec_update == DNSSECUpdate.NO_CHANGE:
    # If GoogleDomainsDNS is currently used, keep the current DNSSEC value.
    # Otherwise keep the default value to disable DNSSEC.
    if dns_settings is not None and dns_settings.googleDomainsDns is not None:
      ds_state = dns_settings.googleDomainsDns.dsState
  dns_settings = domains_messages.DnsSettings(
      googleDomainsDns=domains_messages.GoogleDomainsDns(dsState=ds_state))
  return dns_settings, update_mask


def _ParseDnsSettingsFromFile(domains_messages, path):
  """Parses dns_settings from a yaml file.

  Args:
    domains_messages: Cloud Domains messages module.
    path: YAML file path.

  Returns:
    Pair (DnsSettings, DnsUpdateMask) or (None, None) if path is None.
  """
  dns_settings = util.ParseMessageFromYamlFile(
      path, domains_messages.DnsSettings,
      'DNS settings file \'{}\' does not contain valid dns_settings message'
      .format(path))
  if not dns_settings:
    return None, None

  update_mask = None
  if dns_settings.googleDomainsDns is not None:
    update_mask = DnsUpdateMask(
        name_servers=True, google_domains_dnssec=True, glue_records=True)
  elif dns_settings.customDns is not None:
    update_mask = DnsUpdateMask(
        name_servers=True, custom_dnssec=True, glue_records=True)
  else:
    raise exceptions.Error(
        'dnsProvider is not present in DNS settings file \'{}\'.'.format(path))

  return dns_settings, update_mask


def _GetCloudDnsDetails(
    domains_messages, cloud_dns_zone, domain, dnssec_update, dns_settings=None
):
  """Fetches list of name servers from provided Cloud DNS Managed Zone.

  Args:
    domains_messages: Cloud Domains messages module.
    cloud_dns_zone: Cloud DNS Zone resource reference.
    domain: Domain name.
    dnssec_update: If ENABLE, try to read DNSSEC information from the Zone.
    dns_settings: Current DNS configuration (or None if resource is not yet
      created).

  Returns:
    A pair: List of name servers and a list of Ds records (or [] if e.g. the
    Zone is not signed).
  """
  # Get the managed-zone.
  dns_api_version = 'v1'
  dns = apis.GetClientInstance('dns', dns_api_version)
  dns_messages = dns.MESSAGES_MODULE
  zone_ref = dns_api_util.GetRegistry(dns_api_version).Parse(
      cloud_dns_zone,
      params={
          'project': properties.VALUES.core.project.GetOrFail,
      },
      collection='dns.managedZones',
  )

  try:
    zone = dns.managedZones.Get(
        dns_messages.DnsManagedZonesGetRequest(
            project=zone_ref.project, managedZone=zone_ref.managedZone
        )
    )
  except apitools_exceptions.HttpError as error:
    raise calliope_exceptions.HttpException(error)
  domain_with_dot = domain + '.'
  if zone.dnsName != domain_with_dot:
    raise exceptions.Error(
        "The dnsName '{}' of specified Cloud DNS zone '{}' does not match the "
        "registration domain '{}'".format(
            zone.dnsName, cloud_dns_zone, domain_with_dot
        )
    )
  if (
      zone.visibility
      != dns_messages.ManagedZone.VisibilityValueValuesEnum.public
  ):
    raise exceptions.Error(
        "Cloud DNS Zone '{}' is not public.".format(cloud_dns_zone)
    )

  if dnssec_update == DNSSECUpdate.DISABLE:
    return zone.nameServers, []
  if dnssec_update == DNSSECUpdate.NO_CHANGE:
    # If the DNS Zone is already in use keep the current config.
    if (
        dns_settings is not None
        and dns_settings.customDns is not None
        and set(dns_settings.customDns.nameServers) == set(zone.nameServers)
    ):
      return (
          dns_settings.customDns.nameServers,
          dns_settings.customDns.dsRecords,
      )
    # Otherwise disable DNSSEC
    return zone.nameServers, []

  signed = dns_messages.ManagedZoneDnsSecConfig.StateValueValuesEnum.on
  if not zone.dnssecConfig or zone.dnssecConfig.state != signed:
    log.status.Print(
        'Cloud DNS Zone \'{}\' is not signed. DNSSEC won\'t be enabled.'.format(
            cloud_dns_zone))
    return zone.nameServers, []
  try:
    dns_keys = []
    req = dns_messages.DnsDnsKeysListRequest(
        project=zone_ref.project,
        managedZone=zone_ref.managedZone)
    while True:
      resp = dns.dnsKeys.List(req)
      dns_keys += resp.dnsKeys
      req.pageToken = resp.nextPageToken
      if not resp.nextPageToken:
        break
  except apitools_exceptions.HttpError as error:
    log.status.Print('Cannot read DS records from Cloud DNS Zone \'{}\': {}. '
                     'DNSSEC won\'t be enabled.'.format(cloud_dns_zone, error))
  ds_records = _ConvertDnsKeys(domains_messages, dns_messages, dns_keys)
  if not ds_records:
    log.status.Print('No supported DS records found in Cloud DNS Zone \'{}\'. '
                     'DNSSEC won\'t be enabled.'.format(cloud_dns_zone))
    return zone.nameServers, []
  return zone.nameServers, ds_records


def _ConvertDnsKeys(domains_messages, dns_messages, dns_keys):
  """Converts DnsKeys to DsRecords."""
  ds_records = []
  for key in dns_keys:
    if key.type != dns_messages.DnsKey.TypeValueValuesEnum.keySigning:
      continue
    if not key.isActive:
      continue
    try:
      algorithm = domains_messages.DsRecord.AlgorithmValueValuesEnum(
          six.text_type(key.algorithm).upper())
      for d in key.digests:
        digest_type = domains_messages.DsRecord.DigestTypeValueValuesEnum(
            six.text_type(d.type).upper())
        ds_records.append(
            domains_messages.DsRecord(
                keyTag=key.keyTag,
                digest=d.digest,
                algorithm=algorithm,
                digestType=digest_type))
    except TypeError:
      continue  # Ignore unsupported algorithms and digest types.
  return ds_records


def _DisableDnssec(domains_messages, dns_settings):
  """Returns DNS settings (and update mask) with DNSSEC disabled."""
  if dns_settings is None:
    return None, None
  if dns_settings.googleDomainsDns is not None:
    updated_dns_settings = domains_messages.DnsSettings(
        googleDomainsDns=domains_messages.GoogleDomainsDns(
            dsState=domains_messages.GoogleDomainsDns.DsStateValueValuesEnum
            .DS_RECORDS_UNPUBLISHED))
    update_mask = DnsUpdateMask(google_domains_dnssec=True)
  elif dns_settings.customDns is not None:
    updated_dns_settings = domains_messages.DnsSettings(
        customDns=domains_messages.CustomDns(dsRecords=[]))
    update_mask = DnsUpdateMask(custom_dnssec=True)
  else:
    return None, None
  return updated_dns_settings, update_mask


def PromptForNameServers(api_version,
                         domain,
                         dnssec_update=DNSSECUpdate.NO_CHANGE,
                         dns_settings=None,
                         print_format='default'):
  """Asks the user to provide DNS settings interactively.

  Args:
    api_version: Cloud Domains API version to call.
    domain: Domain name corresponding to the DNS settings.
    dnssec_update: DNSSECUpdate operation.
    dns_settings: Current DNS configuration (or None if resource is not yet
      created).
    print_format: Print format to use when showing current dns_settings.

  Returns:
    A pair: (messages.DnsSettings, DnsUpdateMask) to be updated, or (None, None)
    if the user cancelled.
  """
  domains_messages = registrations.GetMessagesModule(api_version)
  options = [
      'Provide name servers list', 'Provide Cloud DNS Managed Zone name',
      'Use free name servers provided by Google Domains'
  ]
  if dns_settings is not None:  # Update
    log.status.Print('Your current DNS settings are:')
    resource_printer.Print(dns_settings, print_format, out=sys.stderr)

    message = (
        'You can provide your DNS settings by specifying name servers, '
        'a Cloud DNS Managed Zone name or by choosing '
        'free name servers provided by Google Domains'
    )
    cancel_option = True
    default = len(options)  # Additional 'cancel' option.
  else:
    options = options[:2]
    message = (
        'You can provide your DNS settings by specifying name servers '
        'or a Cloud DNS Managed Zone name'
    )
    cancel_option = False
    default = 1  # Cloud DNS Zone.

  index = console_io.PromptChoice(
      message=message,
      options=options,
      cancel_option=cancel_option,
      default=default)
  name_servers = []
  if index == 0:  # name servers.
    while len(name_servers) < 2:
      while True:
        ns = console_io.PromptResponse('Name server (empty line to finish):  ')
        if not ns:
          break
        if not util.ValidateDomainName(ns):
          log.status.Print('Invalid name server: \'{}\'.'.format(ns))
        else:
          name_servers += [ns]
      if len(name_servers) < 2:
        log.status.Print('You have to provide at least 2 name servers.')
    return _CustomNameServers(domains_messages, name_servers)
  elif index == 1:  # Cloud DNS.
    while True:
      zone = util.PromptWithValidator(
          validator=util.ValidateNonEmpty,
          error_message=' Cloud DNS Managed Zone name must not be empty.',
          prompt_string='Cloud DNS Managed Zone name:  ',
      )
      try:
        name_servers, ds_records = _GetCloudDnsDetails(
            domains_messages, zone, domain, dnssec_update, dns_settings
        )
      except (exceptions.Error, calliope_exceptions.HttpException) as e:
        log.status.Print(six.text_type(e))
      else:
        break
    return _CustomNameServers(domains_messages, name_servers, ds_records)
  elif index == 2:  # Google Domains name servers.
    return _GoogleDomainsNameServers(
        domains_messages, dnssec_update, dns_settings
    )
  else:
    return None, None  # Cancel.


def PromptForNameServersTransfer(api_version, domain):
  """Asks the user to provide DNS settings interactively for Transfers.

  Args:
    api_version: Cloud Domains API version to call.
    domain: Domain name corresponding to the DNS settings.

  Returns:
    A triple: (messages.DnsSettings, DnsUpdateMask, _) to be updated, or
    (None, None, _) if the user cancelled. The third value returns true when
    keeping the current DNS settings during Transfer.
  """
  domains_messages = registrations.GetMessagesModule(api_version)
  options = [
      'Provide Cloud DNS Managed Zone name',
      'Use free name servers provided by Google Domains',
      'Keep current DNS settings from current registrar'
  ]
  message = ('You can provide your DNS settings in one of several ways:\n'
             'You can specify a Cloud DNS Managed Zone name. To avoid '
             'downtime following transfer, make sure the zone is configured '
             'correctly before proceeding.\n'
             'You can select free name servers provided by Google Domains. '
             'This blank-slate option cannot be configured before transfer.\n'
             'You can also choose to keep the domain\'s DNS settings '
             'from its current registrar. Use this option only if you are '
             'sure that the domain\'s current DNS service will not cease upon '
             'transfer, as is often the case for DNS services provided for '
             'free by the registrar.')

  cancel_option = False
  default = 2  # Keep current DNS settings.

  # It's not safe to change name servers and enable DNSSEC during transfers at
  # once, so we just mark DNSSEC as disabled. However, this option is only used
  # when the customer is changing the existing DNS configuration.
  dnssec_update = DNSSECUpdate.DISABLE

  index = console_io.PromptChoice(
      message=message,
      options=options,
      cancel_option=cancel_option,
      default=default)
  if index == 0:  # Cloud DNS.
    while True:
      zone = util.PromptWithValidator(
          validator=util.ValidateNonEmpty,
          error_message=' Cloud DNS Managed Zone name must not be empty.',
          prompt_string='Cloud DNS Managed Zone name:  ')
      try:
        name_servers, ds_records = _GetCloudDnsDetails(domains_messages, zone,
                                                       domain, dnssec_update)
      except (exceptions.Error, calliope_exceptions.HttpException) as e:
        log.status.Print(six.text_type(e))
      else:
        break
    dns_settings, update_mask = _CustomNameServers(domains_messages,
                                                   name_servers, ds_records)
    return dns_settings, update_mask, False
  elif index == 1:  # Google Domains name servers.
    dns_settings, update_mask = _GoogleDomainsNameServers(
        domains_messages, dnssec_update)
    return dns_settings, update_mask, False
  else:  # Keep current DNS settings (Transfer).
    return None, None, True


def NameServersEquivalent(prev_dns_settings, new_dns_settings):
  """Checks if dns settings have equivalent name servers."""
  if prev_dns_settings.googleDomainsDns:
    return bool(new_dns_settings.googleDomainsDns)
  if prev_dns_settings.customDns:
    if not new_dns_settings.customDns:
      return False
    prev_ns = sorted(
        map(util.NormalizeDomainName, prev_dns_settings.customDns.nameServers))
    new_ns = sorted(
        map(util.NormalizeDomainName, new_dns_settings.customDns.nameServers))
    return prev_ns == new_ns

  return False


def PromptForUnsafeDnsUpdate():
  console_io.PromptContinue(
      'This operation is not safe.',
      default=False,
      throw_if_unattended=True,
      cancel_on_no=True)


def DnssecEnabled(dns_settings):
  ds_records = []
  if dns_settings.googleDomainsDns is not None:
    ds_records = dns_settings.googleDomainsDns.dsRecords
  if dns_settings.customDns is not None:
    ds_records = dns_settings.customDns.dsRecords
  return bool(ds_records)