File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/dns/util.py
# -*- coding: utf-8 -*- #
# Copyright 2015 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper functions for DNS commands."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
from googlecloudsdk.api_lib.dns import util as api_util
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.command_lib.dns import flags
import ipaddr
def IsIPv4(ip: str) -> bool:
"""Returns True if ip is an IPv4."""
try:
ipaddr.IPv4Address(ip)
return True
except ValueError:
return False
def IsIPv6(ip: str) -> bool:
"""Returns True if ip is an IPv6."""
try:
ipaddr.IPv6Address(ip)
return True
except ValueError:
return False
def ParseKey(algorithm, key_length, key_type, messages):
"""Generate a keyspec from the given (unparsed) command line arguments.
Args:
algorithm: (str) String mnemonic for the DNSSEC algorithm to be specified in
the keyspec; must be a value from AlgorithmValueValuesEnum.
key_length: (int) The key length value to include in the keyspec.
key_type: (KeyTypeValueValuesEnum) Enum value for whether to create a
keyspec for a KSK or a ZSK.
messages: (module) Module (generally auto-generated by the API build rules)
containing the API client's message classes.
Returns:
A messages.DnsKeySpec instance created from the given arguments.
"""
key_spec = None
if algorithm is not None or key_length is not None:
spec_args = {}
spec_args['keyType'] = key_type
if algorithm is not None:
spec_args['algorithm'] = messages.DnsKeySpec.AlgorithmValueValuesEnum(
algorithm)
if key_length is not None:
spec_args['keyLength'] = key_length
if spec_args:
key_spec = messages.DnsKeySpec(**spec_args)
return key_spec
def ParseDnssecConfigArgs(args, messages, api_version='v1'):
# TODO(b/215745011) Clean up this function once move to v2
"""Parse all relevant command line arguments and generate a DNSSEC config.
Args:
args: (dict{str,(str|int)}) Dict of command line arguments; value type
dependent on particular command line argument.
messages: (module) Module (generally auto-generated by the API build rules)
containing the API client's message classes.
api_version: api_version that this function should use.
Returns:
A messages.ManagedZoneDnsSecConfig instance populated from the given
command line arguments.
"""
dnssec_config = None
key_specs = []
ksk_algorithm = None
if args.ksk_algorithm:
ksk_algorithm = flags.GetKeyAlgorithmFlagMapper(
'ksk',
messages,
).GetEnumForChoice(args.ksk_algorithm)
zsk_algorithm = None
if args.zsk_algorithm:
zsk_algorithm = flags.GetKeyAlgorithmFlagMapper(
'zsk',
messages,
).GetEnumForChoice(args.zsk_algorithm)
if api_version == 'v2':
key_enum = messages.DnsKeySpec.KeyTypeValueValuesEnum.KEY_SIGNING
else:
key_enum = messages.DnsKeySpec.KeyTypeValueValuesEnum.keySigning
ksk_key = ParseKey(ksk_algorithm, args.ksk_key_length, key_enum, messages)
if ksk_key is not None:
key_specs.append(ksk_key)
if api_version == 'v2':
key_enum = messages.DnsKeySpec.KeyTypeValueValuesEnum.ZONE_SIGNING
else:
key_enum = messages.DnsKeySpec.KeyTypeValueValuesEnum.zoneSigning
zsk_key = ParseKey(zsk_algorithm, args.zsk_key_length, key_enum, messages)
if zsk_key is not None:
key_specs.append(zsk_key)
dnssec_config_args = {}
if key_specs:
dnssec_config_args['defaultKeySpecs'] = key_specs
if getattr(args, 'denial_of_existence', None) is not None:
dnssec_config_args['nonExistence'] = (
flags.GetDoeFlagMapper(messages).GetEnumForChoice(
args.denial_of_existence))
if args.dnssec_state is not None:
dnssec_config_args['state'] = flags.GetDnsSecStateFlagMapper(
messages, api_version
).GetEnumForChoice(args.dnssec_state)
if dnssec_config_args:
dnssec_config = messages.ManagedZoneDnsSecConfig(**dnssec_config_args)
return dnssec_config
def ParseManagedZoneForwardingConfigWithForwardingPath(
messages, server_list=None, private_server_list=None):
"""Parses list of forwarding nameservers into ManagedZoneForwardingConfig.
Args:
messages: (module) Module (generally auto-generated by the API build rules)
containing the API client's message classes.
server_list: (list) List of IP addresses to use as forwarding targets for
the DNS Managed Zone that uses default forwarding logic (based on RFC1918
check).
private_server_list: (list) List of IP addresses to use as forwarding
targets for the DNS Managed Zone that always use the private VPC path.
Returns:
A messages.ManagedZoneForwardingConfig instance populated from the given
command line arguments.
"""
target_servers = []
default_enum = messages.ManagedZoneForwardingConfigNameServerTarget.ForwardingPathValueValuesEnum(
0)
private_enum = messages.ManagedZoneForwardingConfigNameServerTarget.ForwardingPathValueValuesEnum(
1)
if server_list is not None:
for name in server_list:
if IsIPv4(name):
target_servers.append(
messages.ManagedZoneForwardingConfigNameServerTarget(
ipv4Address=name, ipv6Address=None, forwardingPath=default_enum
)
)
elif IsIPv6(name):
target_servers.append(
messages.ManagedZoneForwardingConfigNameServerTarget(
ipv4Address=None, ipv6Address=name, forwardingPath=default_enum
)
)
else:
target_servers.append(
messages.ManagedZoneForwardingConfigNameServerTarget(
ipv4Address=None,
ipv6Address=None,
domainName=name,
forwardingPath=default_enum,
)
)
if private_server_list is not None:
for name in private_server_list:
if IsIPv4(name):
target_servers.append(
messages.ManagedZoneForwardingConfigNameServerTarget(
ipv4Address=name, ipv6Address=None,
forwardingPath=private_enum))
elif IsIPv6(name):
target_servers.append(
messages.ManagedZoneForwardingConfigNameServerTarget(
ipv4Address=None, ipv6Address=name,
forwardingPath=private_enum))
else:
target_servers.append(
messages.ManagedZoneForwardingConfigNameServerTarget(
ipv4Address=None,
ipv6Address=None,
domainName=name,
forwardingPath=private_enum,
)
)
return messages.ManagedZoneForwardingConfig(targetNameServers=target_servers)
def PolicyNetworkProcessor(parsed_value, version='v1'):
"""Build PolicyNetwork message from parsed_value."""
# Parsed Value should be a list of compute.network resources
messages = GetMessages(version)
if not parsed_value:
return []
return [
messages.PolicyNetwork(networkUrl=network_ref.SelfLink())
for network_ref in parsed_value
]
def BetaPolicyNetworkProcessor(parsed_value):
"""Build Beta PolicyNetwork message from parsed_value."""
# Parsed Value should be a list of compute.network resources
return PolicyNetworkProcessor(parsed_value, version='v1beta2')
def ResponsePolicyNetworkProcessor(parsed_value, version='v1'):
"""Build PolicyNetwork message from parsed_value."""
# Parsed value should be a list of compute.network resources
messages = GetMessages(version)
if not parsed_value:
return []
return [
messages.ResponsePolicyNetwork(networkUrl=network_ref.SelfLink())
for network_ref in parsed_value
]
def TargetNameServerType(value, version='v1'):
"""Build a single TargetNameServer based on 'value'.
Args:
value: (str) A string representation of an IPV4 ip address representing the
PrivateTargetNameServer.
version: (str) A string indicating the version of the API to be used, should
be 'v1' only before removing BetaTargetNameServerType.
Returns:
A messages.PolicyAlternativeNameServerConfigTargetNameServer instance
populated from the given ip address.
"""
messages = GetMessages(version)
if IsIPv4(value):
return messages.PolicyAlternativeNameServerConfigTargetNameServer(
ipv4Address=value,
ipv6Address=None,
forwardingPath=messages.PolicyAlternativeNameServerConfigTargetNameServer.ForwardingPathValueValuesEnum(
0
),
)
else:
return messages.PolicyAlternativeNameServerConfigTargetNameServer(
ipv4Address=None,
ipv6Address=value,
forwardingPath=messages.PolicyAlternativeNameServerConfigTargetNameServer.ForwardingPathValueValuesEnum(
0
),
)
def BetaTargetNameServerType(value, version='v1beta2'):
"""Build a single TargetNameServer based on 'value'.
Args:
value: (str) A string representation of an IPV4 ip address representing the
PrivateTargetNameServer.
version: (str) A string indicating the version of the API to be used, should
be one of 'v1beta2' or 'v1alpha2'. This function will be removed after
promoting v6 address to GA.
Returns:
A messages.PolicyAlternativeNameServerConfigTargetNameServer instance
populated from the given ip address.
"""
messages = GetMessages(version)
if IsIPv4(value):
return messages.PolicyAlternativeNameServerConfigTargetNameServer(
ipv4Address=value,
ipv6Address=None,
forwardingPath=messages
.PolicyAlternativeNameServerConfigTargetNameServer
.ForwardingPathValueValuesEnum(0))
else:
return messages.PolicyAlternativeNameServerConfigTargetNameServer(
ipv4Address=None,
ipv6Address=value,
forwardingPath=messages
.PolicyAlternativeNameServerConfigTargetNameServer
.ForwardingPathValueValuesEnum(0))
def PrivateTargetNameServerType(value, version='v1'):
"""Build a single PrivateTargetNameServer based on 'value'.
Args:
value: (str) A string representation of an IPV4 ip address representing the
PrivateTargetNameServer.
version: (str) A string indicating the version of the API to be used, should
be 'v1' only before removing BetaPrivateTargetNameServerType.
Returns:
A messages.PolicyAlternativeNameServerConfigTargetNameServer instance
populated from the given ip address.
"""
messages = GetMessages(version)
if IsIPv4(value):
return messages.PolicyAlternativeNameServerConfigTargetNameServer(
ipv4Address=value,
ipv6Address=None,
forwardingPath=messages.PolicyAlternativeNameServerConfigTargetNameServer.ForwardingPathValueValuesEnum(
1
),
)
else:
return messages.PolicyAlternativeNameServerConfigTargetNameServer(
ipv4Address=None,
ipv6Address=value,
forwardingPath=messages.PolicyAlternativeNameServerConfigTargetNameServer.ForwardingPathValueValuesEnum(
1
),
)
def BetaPrivateTargetNameServerType(value, version='v1beta2'):
"""Build a single PrivateTargetNameServer based on 'value'.
Args:
value: (str) A string representation of an IPV4 ip address representing the
PrivateTargetNameServer.
version: (str) A string indicating the version of the API to be used, should
be one of 'v1beta2' or 'v1alpha2'. This function will be removed after
promoting v6 address to GA.
Returns:
A messages.PolicyAlternativeNameServerConfigTargetNameServer instance
populated from the given ip address.
"""
messages = GetMessages(version)
if IsIPv4(value):
return messages.PolicyAlternativeNameServerConfigTargetNameServer(
ipv4Address=value,
ipv6Address=None,
forwardingPath=messages
.PolicyAlternativeNameServerConfigTargetNameServer
.ForwardingPathValueValuesEnum(1))
else:
return messages.PolicyAlternativeNameServerConfigTargetNameServer(
ipv4Address=None,
ipv6Address=value,
forwardingPath=messages
.PolicyAlternativeNameServerConfigTargetNameServer
.ForwardingPathValueValuesEnum(1))
def ParsePolicyNetworks(value, project, version):
"""Build a list of PolicyNetworks from command line args."""
networks = ParseNetworks(value, project, version)
return PolicyNetworkProcessor(networks, version)
def ParseNetworks(value, project, version):
"""Build a list of PolicyNetworks or ResponsePolicyNetworks from command line args."""
if not value:
return []
registry = api_util.GetRegistry(version)
networks = [
registry.Parse(
network_name,
collection='compute.networks',
params={'project': project}) for network_name in value
]
return networks
def ParseResponsePolicyNetworks(value, project, version):
"""Build a list of ResponsePolicyNetworks from command line args."""
networks = ParseNetworks(value, project, version)
return ResponsePolicyNetworkProcessor(networks, version)
def ParseAltNameServers(version, server_list=None, private_server_list=None):
"""Parses list of alternative nameservers into AlternativeNameServerConfig.
Args:
version: (str) A string indicating the version of the API to be used, should
be 'v1' only before removing BetaParseAltNameServers.
server_list: (Sequence) List of IP addresses to use as forwarding targets
for the DNS managed zone that uses default forwarding logic.
private_server_list: (Sequence) List of IP addresses to use as forwarding
targets for the DNS Managed Zone that always uses the private VPC path.
Returns:
A messages.PolicyAlternativeNameServerConfig instance populated from the
given command line arguments.Only the not none server list will be parsed
and
an empty list will be returned if both are none.
"""
if not server_list and not private_server_list:
return None
messages = GetMessages(version)
result_list = []
if server_list:
result_list += [TargetNameServerType(ip, version) for ip in server_list]
if private_server_list:
result_list += [
PrivateTargetNameServerType(ip, version) for ip in private_server_list
]
return messages.PolicyAlternativeNameServerConfig(
targetNameServers=result_list)
def BetaParseAltNameServers(version,
server_list=None,
private_server_list=None):
"""Parses list of alternative nameservers into AlternativeNameServerConfig.
Args:
version: (str) A string indicating the version of the API to be used, should
be one of 'v1beta2' or 'v1alpha2'. This function will be moved after
promoting v6 address to GA.
server_list: (Sequence) List of IP addresses to use as forwarding targets
for the DNS Managed Zone that uses default forwarding logic.
private_server_list: (Sequence) List of IP addresses to use as forwarding
targets for the DNS Managed Zone that always uses the private VPC path.
Returns:
A messages.PolicyAlternativeNameServerConfig instance populated from the
given command line arguments.Only the not none server list will be parsed
and
an empty list will be returned if both are none.
"""
if not server_list and not private_server_list:
return None
messages = GetMessages(version)
result_list = []
if server_list:
result_list += [BetaTargetNameServerType(ip, version) for ip in server_list]
if private_server_list:
result_list += [
BetaPrivateTargetNameServerType(ip, version)
for ip in private_server_list
]
return messages.PolicyAlternativeNameServerConfig(
targetNameServers=result_list)
def ParseResponsePolicyRulesBehavior(args, version='v1'):
"""Parses response policy rule behavior."""
m = GetMessages(version)
if args.behavior == 'bypassResponsePolicy':
return m.ResponsePolicyRule.BehaviorValueValuesEnum.BYPASS_RESPONSE_POLICY if version == 'v2' else m.ResponsePolicyRule.BehaviorValueValuesEnum.bypassResponsePolicy
else:
return None
# TODO(b/215745011) Use this once GCloud is migrated to v2
# return flags.GetResponsePolicyRulesBehaviorFlagMapper(
# messages).GetEnumForChoice(args.behavior)
def GetMessages(version='v1'):
return apis.GetMessagesModule('dns', version)