HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/dns/util.py
# -*- coding: utf-8 -*- #
# Copyright 2015 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper functions for DNS commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.api_lib.dns import util as api_util
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.command_lib.dns import flags

import ipaddr


def IsIPv4(ip: str) -> bool:
  """Returns True if ip is an IPv4."""
  try:
    ipaddr.IPv4Address(ip)
    return True
  except ValueError:
    return False


def IsIPv6(ip: str) -> bool:
  """Returns True if ip is an IPv6."""
  try:
    ipaddr.IPv6Address(ip)
    return True
  except ValueError:
    return False


def ParseKey(algorithm, key_length, key_type, messages):
  """Generate a keyspec from the given (unparsed) command line arguments.

  Args:
    algorithm: (str) String mnemonic for the DNSSEC algorithm to be specified in
        the keyspec; must be a value from AlgorithmValueValuesEnum.
    key_length: (int) The key length value to include in the keyspec.
    key_type: (KeyTypeValueValuesEnum) Enum value for whether to create a
        keyspec for a KSK or a ZSK.
    messages: (module) Module (generally auto-generated by the API build rules)
        containing the API client's message classes.

  Returns:
    A messages.DnsKeySpec instance created from the given arguments.
  """

  key_spec = None

  if algorithm is not None or key_length is not None:
    spec_args = {}
    spec_args['keyType'] = key_type
    if algorithm is not None:
      spec_args['algorithm'] = messages.DnsKeySpec.AlgorithmValueValuesEnum(
          algorithm)
    if key_length is not None:
      spec_args['keyLength'] = key_length

    if spec_args:
      key_spec = messages.DnsKeySpec(**spec_args)

  return key_spec


def ParseDnssecConfigArgs(args, messages, api_version='v1'):
  # TODO(b/215745011) Clean up this function once move to v2
  """Parse all relevant command line arguments and generate a DNSSEC config.

  Args:
    args: (dict{str,(str|int)}) Dict of command line arguments; value type
      dependent on particular command line argument.
    messages: (module) Module (generally auto-generated by the API build rules)
      containing the API client's message classes.
    api_version: api_version that this function should use.

  Returns:
    A messages.ManagedZoneDnsSecConfig instance populated from the given
    command line arguments.
  """

  dnssec_config = None
  key_specs = []

  ksk_algorithm = None
  if args.ksk_algorithm:
    ksk_algorithm = flags.GetKeyAlgorithmFlagMapper(
        'ksk',
        messages,
    ).GetEnumForChoice(args.ksk_algorithm)
  zsk_algorithm = None
  if args.zsk_algorithm:
    zsk_algorithm = flags.GetKeyAlgorithmFlagMapper(
        'zsk',
        messages,
    ).GetEnumForChoice(args.zsk_algorithm)

  if api_version == 'v2':
    key_enum = messages.DnsKeySpec.KeyTypeValueValuesEnum.KEY_SIGNING
  else:
    key_enum = messages.DnsKeySpec.KeyTypeValueValuesEnum.keySigning

  ksk_key = ParseKey(ksk_algorithm, args.ksk_key_length, key_enum, messages)
  if ksk_key is not None:
    key_specs.append(ksk_key)

  if api_version == 'v2':
    key_enum = messages.DnsKeySpec.KeyTypeValueValuesEnum.ZONE_SIGNING
  else:
    key_enum = messages.DnsKeySpec.KeyTypeValueValuesEnum.zoneSigning

  zsk_key = ParseKey(zsk_algorithm, args.zsk_key_length, key_enum, messages)
  if zsk_key is not None:
    key_specs.append(zsk_key)

  dnssec_config_args = {}
  if key_specs:
    dnssec_config_args['defaultKeySpecs'] = key_specs
  if getattr(args, 'denial_of_existence', None) is not None:
    dnssec_config_args['nonExistence'] = (
        flags.GetDoeFlagMapper(messages).GetEnumForChoice(
            args.denial_of_existence))
  if args.dnssec_state is not None:
    dnssec_config_args['state'] = flags.GetDnsSecStateFlagMapper(
        messages, api_version
    ).GetEnumForChoice(args.dnssec_state)
  if dnssec_config_args:
    dnssec_config = messages.ManagedZoneDnsSecConfig(**dnssec_config_args)

  return dnssec_config


def ParseManagedZoneForwardingConfigWithForwardingPath(
    messages, server_list=None, private_server_list=None):
  """Parses list of forwarding nameservers into ManagedZoneForwardingConfig.

  Args:
    messages: (module) Module (generally auto-generated by the API build rules)
      containing the API client's message classes.
    server_list: (list) List of IP addresses to use as forwarding targets for
      the DNS Managed Zone that uses default forwarding logic (based on RFC1918
      check).
    private_server_list: (list) List of IP addresses to use as forwarding
      targets for the DNS Managed Zone that always use the private VPC path.

  Returns:
    A messages.ManagedZoneForwardingConfig instance populated from the given
    command line arguments.
  """
  target_servers = []
  default_enum = messages.ManagedZoneForwardingConfigNameServerTarget.ForwardingPathValueValuesEnum(
      0)
  private_enum = messages.ManagedZoneForwardingConfigNameServerTarget.ForwardingPathValueValuesEnum(
      1)
  if server_list is not None:
    for name in server_list:
      if IsIPv4(name):
        target_servers.append(
            messages.ManagedZoneForwardingConfigNameServerTarget(
                ipv4Address=name, ipv6Address=None, forwardingPath=default_enum
            )
        )
      elif IsIPv6(name):
        target_servers.append(
            messages.ManagedZoneForwardingConfigNameServerTarget(
                ipv4Address=None, ipv6Address=name, forwardingPath=default_enum
            )
        )
      else:
        target_servers.append(
            messages.ManagedZoneForwardingConfigNameServerTarget(
                ipv4Address=None,
                ipv6Address=None,
                domainName=name,
                forwardingPath=default_enum,
            )
        )
  if private_server_list is not None:
    for name in private_server_list:
      if IsIPv4(name):
        target_servers.append(
            messages.ManagedZoneForwardingConfigNameServerTarget(
                ipv4Address=name, ipv6Address=None,
                forwardingPath=private_enum))
      elif IsIPv6(name):
        target_servers.append(
            messages.ManagedZoneForwardingConfigNameServerTarget(
                ipv4Address=None, ipv6Address=name,
                forwardingPath=private_enum))
      else:
        target_servers.append(
            messages.ManagedZoneForwardingConfigNameServerTarget(
                ipv4Address=None,
                ipv6Address=None,
                domainName=name,
                forwardingPath=private_enum,
            )
        )

  return messages.ManagedZoneForwardingConfig(targetNameServers=target_servers)


def PolicyNetworkProcessor(parsed_value, version='v1'):
  """Build PolicyNetwork message from parsed_value."""
  # Parsed Value should be a list of compute.network resources
  messages = GetMessages(version)
  if not parsed_value:
    return []

  return [
      messages.PolicyNetwork(networkUrl=network_ref.SelfLink())
      for network_ref in parsed_value
  ]


def BetaPolicyNetworkProcessor(parsed_value):
  """Build Beta PolicyNetwork message from parsed_value."""
  # Parsed Value should be a list of compute.network resources
  return PolicyNetworkProcessor(parsed_value, version='v1beta2')


def ResponsePolicyNetworkProcessor(parsed_value, version='v1'):
  """Build PolicyNetwork message from parsed_value."""
  # Parsed value should be a list of compute.network resources
  messages = GetMessages(version)
  if not parsed_value:
    return []

  return [
      messages.ResponsePolicyNetwork(networkUrl=network_ref.SelfLink())
      for network_ref in parsed_value
  ]

def TargetNameServerType(value, version='v1'):
  """Build a single TargetNameServer based on 'value'.

  Args:
    value: (str) A string representation of an IPV4 ip address representing the
      PrivateTargetNameServer.
    version: (str) A string indicating the version of the API to be used, should
      be 'v1' only before removing BetaTargetNameServerType.

  Returns:
    A messages.PolicyAlternativeNameServerConfigTargetNameServer instance
    populated from the given ip address.
  """
  messages = GetMessages(version)
  if IsIPv4(value):
    return messages.PolicyAlternativeNameServerConfigTargetNameServer(
        ipv4Address=value,
        ipv6Address=None,
        forwardingPath=messages.PolicyAlternativeNameServerConfigTargetNameServer.ForwardingPathValueValuesEnum(
            0
        ),
    )
  else:
    return messages.PolicyAlternativeNameServerConfigTargetNameServer(
        ipv4Address=None,
        ipv6Address=value,
        forwardingPath=messages.PolicyAlternativeNameServerConfigTargetNameServer.ForwardingPathValueValuesEnum(
            0
        ),
    )


def BetaTargetNameServerType(value, version='v1beta2'):
  """Build a single TargetNameServer based on 'value'.

  Args:
    value: (str) A string representation of an IPV4 ip address representing the
      PrivateTargetNameServer.
    version: (str) A string indicating the version of the API to be used, should
      be one of 'v1beta2' or 'v1alpha2'. This function will be removed after
      promoting v6 address to GA.

  Returns:
    A messages.PolicyAlternativeNameServerConfigTargetNameServer instance
    populated from the given ip address.
  """
  messages = GetMessages(version)
  if IsIPv4(value):
    return messages.PolicyAlternativeNameServerConfigTargetNameServer(
        ipv4Address=value,
        ipv6Address=None,
        forwardingPath=messages
        .PolicyAlternativeNameServerConfigTargetNameServer
        .ForwardingPathValueValuesEnum(0))
  else:
    return messages.PolicyAlternativeNameServerConfigTargetNameServer(
        ipv4Address=None,
        ipv6Address=value,
        forwardingPath=messages
        .PolicyAlternativeNameServerConfigTargetNameServer
        .ForwardingPathValueValuesEnum(0))


def PrivateTargetNameServerType(value, version='v1'):
  """Build a single PrivateTargetNameServer based on 'value'.

  Args:
    value: (str) A string representation of an IPV4 ip address representing the
      PrivateTargetNameServer.
    version: (str) A string indicating the version of the API to be used, should
      be 'v1' only before removing BetaPrivateTargetNameServerType.

  Returns:
    A messages.PolicyAlternativeNameServerConfigTargetNameServer instance
    populated from the given ip address.
  """
  messages = GetMessages(version)
  if IsIPv4(value):
    return messages.PolicyAlternativeNameServerConfigTargetNameServer(
        ipv4Address=value,
        ipv6Address=None,
        forwardingPath=messages.PolicyAlternativeNameServerConfigTargetNameServer.ForwardingPathValueValuesEnum(
            1
        ),
    )
  else:
    return messages.PolicyAlternativeNameServerConfigTargetNameServer(
        ipv4Address=None,
        ipv6Address=value,
        forwardingPath=messages.PolicyAlternativeNameServerConfigTargetNameServer.ForwardingPathValueValuesEnum(
            1
        ),
    )


def BetaPrivateTargetNameServerType(value, version='v1beta2'):
  """Build a single PrivateTargetNameServer based on 'value'.

  Args:
    value: (str) A string representation of an IPV4 ip address representing the
      PrivateTargetNameServer.
    version: (str) A string indicating the version of the API to be used, should
      be one of 'v1beta2' or 'v1alpha2'. This function will be removed after
      promoting v6 address to GA.

  Returns:
    A messages.PolicyAlternativeNameServerConfigTargetNameServer instance
    populated from the given ip address.
  """
  messages = GetMessages(version)
  if IsIPv4(value):
    return messages.PolicyAlternativeNameServerConfigTargetNameServer(
        ipv4Address=value,
        ipv6Address=None,
        forwardingPath=messages
        .PolicyAlternativeNameServerConfigTargetNameServer
        .ForwardingPathValueValuesEnum(1))
  else:
    return messages.PolicyAlternativeNameServerConfigTargetNameServer(
        ipv4Address=None,
        ipv6Address=value,
        forwardingPath=messages
        .PolicyAlternativeNameServerConfigTargetNameServer
        .ForwardingPathValueValuesEnum(1))


def ParsePolicyNetworks(value, project, version):
  """Build a list of PolicyNetworks from command line args."""
  networks = ParseNetworks(value, project, version)
  return PolicyNetworkProcessor(networks, version)


def ParseNetworks(value, project, version):
  """Build a list of PolicyNetworks or ResponsePolicyNetworks from command line args."""
  if not value:
    return []
  registry = api_util.GetRegistry(version)
  networks = [
      registry.Parse(
          network_name,
          collection='compute.networks',
          params={'project': project}) for network_name in value
  ]
  return networks


def ParseResponsePolicyNetworks(value, project, version):
  """Build a list of ResponsePolicyNetworks from command line args."""
  networks = ParseNetworks(value, project, version)
  return ResponsePolicyNetworkProcessor(networks, version)


def ParseAltNameServers(version, server_list=None, private_server_list=None):
  """Parses list of alternative nameservers into AlternativeNameServerConfig.

  Args:
    version: (str) A string indicating the version of the API to be used, should
      be 'v1' only before removing BetaParseAltNameServers.
    server_list: (Sequence) List of IP addresses to use as forwarding targets
      for the DNS managed zone that uses default forwarding logic.
    private_server_list: (Sequence) List of IP addresses to use as forwarding
      targets for the DNS Managed Zone that always uses the private VPC path.

  Returns:
    A messages.PolicyAlternativeNameServerConfig instance populated from the
    given command line arguments.Only the not none server list will be parsed
    and
    an empty list will be returned if both are none.
  """
  if not server_list and not private_server_list:
    return None
  messages = GetMessages(version)
  result_list = []
  if server_list:
    result_list += [TargetNameServerType(ip, version) for ip in server_list]
  if private_server_list:
    result_list += [
        PrivateTargetNameServerType(ip, version) for ip in private_server_list
    ]
  return messages.PolicyAlternativeNameServerConfig(
      targetNameServers=result_list)


def BetaParseAltNameServers(version,
                            server_list=None,
                            private_server_list=None):
  """Parses list of alternative nameservers into AlternativeNameServerConfig.

  Args:
    version: (str) A string indicating the version of the API to be used, should
      be one of 'v1beta2' or 'v1alpha2'. This function will be moved after
      promoting v6 address to GA.
    server_list: (Sequence) List of IP addresses to use as forwarding targets
      for the DNS Managed Zone that uses default forwarding logic.
    private_server_list: (Sequence) List of IP addresses to use as forwarding
      targets for the DNS Managed Zone that always uses the private VPC path.

  Returns:
    A messages.PolicyAlternativeNameServerConfig instance populated from the
    given command line arguments.Only the not none server list will be parsed
    and
    an empty list will be returned if both are none.
  """
  if not server_list and not private_server_list:
    return None
  messages = GetMessages(version)
  result_list = []
  if server_list:
    result_list += [BetaTargetNameServerType(ip, version) for ip in server_list]
  if private_server_list:
    result_list += [
        BetaPrivateTargetNameServerType(ip, version)
        for ip in private_server_list
    ]
  return messages.PolicyAlternativeNameServerConfig(
      targetNameServers=result_list)


def ParseResponsePolicyRulesBehavior(args, version='v1'):
  """Parses response policy rule behavior."""
  m = GetMessages(version)
  if args.behavior == 'bypassResponsePolicy':
    return m.ResponsePolicyRule.BehaviorValueValuesEnum.BYPASS_RESPONSE_POLICY if version == 'v2' else m.ResponsePolicyRule.BehaviorValueValuesEnum.bypassResponsePolicy
  else:
    return None
  # TODO(b/215745011) Use this once GCloud is migrated to v2
  # return flags.GetResponsePolicyRulesBehaviorFlagMapper(
  #    messages).GetEnumForChoice(args.behavior)


def GetMessages(version='v1'):
  return apis.GetMessagesModule('dns', version)