HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/network_management/util.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for `gcloud network-management`."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import re

from googlecloudsdk.core import exceptions


class NetworkManagementError(exceptions.Error):
  """Top-level exception for all Network Management errors."""


class InvalidInputError(NetworkManagementError):
  """Exception for invalid input."""


def GetClearSingleEndpointAttrErrorMsg(endpoints, endpoint_type):
  """Creates a message to specify at least one endpoint, separated by commas and or."""
  error_msg = ["Invalid Connectivity Test. "]
  if len(endpoints) > 1:
    error_msg.append("At least one of ")

  for index, endpoint in enumerate(endpoints):
    error_msg.append(
        "--{endpoint_type}-{endpoint}".format(
            endpoint_type=endpoint_type, endpoint=endpoint
        )
    )
    if index == 0 and len(endpoints) == 2:
      error_msg.append(" or ")
    elif index == len(endpoints) - 2:
      error_msg.append(", or ")
    elif index < len(endpoints) - 2:
      error_msg.append(", ")

  error_msg.append(" must be specified.")
  return "".join(error_msg)


def AppendLocationsGlobalToParent(unused_ref, unused_args, request):
  """Add locations/global to parent path, since it isn't automatically populated by apitools."""
  request.parent += "/locations/global"
  return request


def UpdateOperationRequestNameVariable(unused_ref, unused_args, request):
  request.name += "/locations/global"
  return request


def AddFieldToUpdateMask(field, patch_request):
  """Adds name of field to update mask."""
  update_mask = patch_request.updateMask
  if not update_mask:
    patch_request.updateMask = field
  elif field not in update_mask:
    patch_request.updateMask = update_mask + "," + field
  return patch_request


def ClearEndpointValue(endpoint, endpoint_name):
  proto_endpoint_fields = {
      "cloudFunction",
      "appEngineVersion",
      "cloudRunRevision",
  }
  if endpoint_name in proto_endpoint_fields:
    # Endpoint is a proto message: Set to None.
    setattr(endpoint, endpoint_name, None)
  else:
    # Endpoint is a string (only a URI): Set to "".
    setattr(endpoint, endpoint_name, "")


def ClearSingleEndpointAttr(patch_request, endpoint_type, endpoint_name):
  """Checks if given endpoint can be removed from Connectivity Test and removes it."""
  test = patch_request.connectivityTest
  endpoint = getattr(test, endpoint_type)
  endpoint_fields = {
      "instance",
      "ipAddress",
      "gkeMasterCluster",
      "cloudSqlInstance",
      "cloudFunction",
      "appEngineVersion",
      "cloudRunRevision",
      "forwardingRule",
      "redisInstance",
      "redisCluster",
  }
  non_empty_endpoint_fields = 0
  for field in endpoint_fields:
    if getattr(endpoint, field, None):
      non_empty_endpoint_fields += 1
  if non_empty_endpoint_fields > 1 or not getattr(
      endpoint, endpoint_name, None
  ):
    ClearEndpointValue(endpoint, endpoint_name)
    setattr(test, endpoint_type, endpoint)
    patch_request.connectivityTest = test
    return AddFieldToUpdateMask(
        endpoint_type + "." + endpoint_name, patch_request
    )
  else:
    endpoints = [
        "instance",
        "ip-address",
        "gke-master-cluster",
        "cloud-sql-instance",
    ]
    if endpoint_type == "source":
      endpoints.extend([
          "cloud-function",
          "app-engine-version",
          "cloud-run-revision",
      ])
    if endpoint_type == "destination":
      endpoints.extend([
          "forwarding-rule",
          "redis-instance",
          "redis-cluster",
      ])
    raise InvalidInputError(
        GetClearSingleEndpointAttrErrorMsg(endpoints, endpoint_type)
    )


def ClearEndpointAttrs(unused_ref, args, patch_request):
  """Handles clear_source_* and clear_destination_* flags."""
  flags_and_endpoints = [
      ("clear_source_instance", "source", "instance"),
      ("clear_source_ip_address", "source", "ipAddress"),
      ("clear_source_gke_master_cluster", "source", "gkeMasterCluster"),
      ("clear_source_cloud_sql_instance", "source", "cloudSqlInstance"),
      ("clear_source_cloud_function", "source", "cloudFunction"),
      ("clear_source_app_engine_version", "source", "appEngineVersion"),
      ("clear_source_cloud_run_revision", "source", "cloudRunRevision"),
      ("clear_destination_instance", "destination", "instance"),
      ("clear_destination_ip_address", "destination", "ipAddress"),
      (
          "clear_destination_gke_master_cluster",
          "destination",
          "gkeMasterCluster",
      ),
      (
          "clear_destination_cloud_sql_instance",
          "destination",
          "cloudSqlInstance",
      ),
      ("clear_destination_forwarding_rule", "destination", "forwardingRule"),
      ("clear_destination_redis_instance", "destination", "redisInstance"),
      ("clear_destination_redis_cluster", "destination", "redisCluster"),
  ]

  for flag, endpoint_type, endpoint_name in flags_and_endpoints:
    if args.IsSpecified(flag):
      patch_request = ClearSingleEndpointAttr(
          patch_request,
          endpoint_type,
          endpoint_name,
      )

  return patch_request


def ClearSingleEndpointAttrBeta(patch_request, endpoint_type, endpoint_name):
  """Checks if given endpoint can be removed from Connectivity Test and removes it."""
  test = patch_request.connectivityTest
  endpoint = getattr(test, endpoint_type)
  endpoint_fields = {
      "instance",
      "ipAddress",
      "gkeMasterCluster",
      "cloudSqlInstance",
      "cloudFunction",
      "appEngineVersion",
      "cloudRunRevision",
      "forwardingRule",
      "redisInstance",
      "redisCluster",
  }
  non_empty_endpoint_fields = 0
  for field in endpoint_fields:
    if getattr(endpoint, field, None):
      non_empty_endpoint_fields += 1
  if non_empty_endpoint_fields > 1 or not getattr(
      endpoint, endpoint_name, None
  ):
    ClearEndpointValue(endpoint, endpoint_name)
    setattr(test, endpoint_type, endpoint)
    patch_request.connectivityTest = test
    return AddFieldToUpdateMask(
        endpoint_type + "." + endpoint_name, patch_request
    )
  else:
    endpoints = [
        "instance",
        "ip-address",
        "gke-master-cluster",
        "cloud-sql-instance",
    ]
    if endpoint_type == "source":
      endpoints.extend([
          "cloud-function",
          "app-engine-version",
          "cloud-run-revision",
      ])
    if endpoint_type == "destination":
      endpoints.extend([
          "forwarding-rule",
          "redis-instance",
          "redis-cluster",
      ])
    raise InvalidInputError(
        GetClearSingleEndpointAttrErrorMsg(endpoints, endpoint_type)
    )


def ClearEndpointAttrsBeta(unused_ref, args, patch_request):
  """Handles clear_source_* and clear_destination_* flags."""
  flags_and_endpoints = [
      ("clear_source_instance", "source", "instance"),
      ("clear_source_ip_address", "source", "ipAddress"),
      ("clear_source_gke_master_cluster", "source", "gkeMasterCluster"),
      ("clear_source_cloud_sql_instance", "source", "cloudSqlInstance"),
      ("clear_source_cloud_function", "source", "cloudFunction"),
      ("clear_source_app_engine_version", "source", "appEngineVersion"),
      ("clear_source_cloud_run_revision", "source", "cloudRunRevision"),
      ("clear_destination_instance", "destination", "instance"),
      ("clear_destination_ip_address", "destination", "ipAddress"),
      (
          "clear_destination_gke_master_cluster",
          "destination",
          "gkeMasterCluster",
      ),
      (
          "clear_destination_cloud_sql_instance",
          "destination",
          "cloudSqlInstance",
      ),
      ("clear_destination_forwarding_rule", "destination", "forwardingRule"),
      ("clear_destination_redis_instance", "destination", "redisInstance"),
      ("clear_destination_redis_cluster", "destination", "redisCluster"),
  ]

  for flag, endpoint_type, endpoint_name in flags_and_endpoints:
    if args.IsSpecified(flag):
      patch_request = ClearSingleEndpointAttrBeta(
          patch_request,
          endpoint_type,
          endpoint_name,
      )

  return patch_request


def ValidateInstanceNames(unused_ref, args, request):
  """Checks if all provided instances are in valid format."""
  flags = [
      "source_instance",
      "destination_instance",
  ]
  instance_pattern = re.compile(
      r"projects/(?:[a-z][a-z0-9-\.:]*[a-z0-9])/zones/[-\w]+/instances/[-\w]+"
  )
  for flag in flags:
    if args.IsSpecified(flag):
      instance = getattr(args, flag)
      if not instance_pattern.match(instance):
        raise InvalidInputError(
            "Invalid value for flag {}: {}\n"
            "Expected instance in the following format:\n"
            "  projects/my-project/zones/zone/instances/my-instance".format(
                flag, instance
            )
        )

  return request


def ValidateNetworkURIs(unused_ref, args, request):
  """Checks if all provided networks are in valid format."""
  flags = [
      "source_network",
      "destination_network",
  ]
  network_pattern = re.compile(
      r"projects/(?:[a-z][a-z0-9-\.:]*[a-z0-9])/global/networks/[-\w]+"
  )
  for flag in flags:
    if args.IsSpecified(flag):
      network = getattr(args, flag)
      if not network_pattern.match(network):
        raise InvalidInputError(
            "Invalid value for flag {}: {}\n"
            "Expected network in the following format:\n"
            "  projects/my-project/global/networks/my-network".format(
                flag, network
            )
        )

  return request


def ValidateGKEMasterClustersURIs(unused_ref, args, request):
  """Checks if all provided GKE Master Clusters URIs are in correct format."""
  flags = [
      "source_gke_master_cluster",
      "destination_gke_master_cluster",
  ]
  instance_pattern = re.compile(
      r"projects/(?:[a-z][a-z0-9-\.:]*[a-z0-9])/(zones|locations)/[-\w]+/clusters/[-\w]+"
  )
  for flag in flags:
    if args.IsSpecified(flag):
      cluster = getattr(args, flag)
      if not instance_pattern.match(cluster):
        raise InvalidInputError(
            "Invalid value for flag {}: {}\nExpected Google Kubernetes Engine"
            " master cluster in the following format:\n "
            " projects/my-project/location/location/clusters/my-cluster".format(
                flag, cluster
            )
        )
  return request


def ValidateCloudSQLInstancesURIs(unused_ref, args, request):
  """Checks if all provided Cloud SQL Instances URIs are in correct format."""
  flags = [
      "source_cloud_sql_instance",
      "destination_cloud_sql_instance",
  ]
  instance_pattern = re.compile(
      r"projects/(?:[a-z][a-z0-9-\.:]*[a-z0-9])/instances/[-\w]+"
  )
  for flag in flags:
    if args.IsSpecified(flag):
      instance = getattr(args, flag)
      if not instance_pattern.match(instance):
        raise InvalidInputError(
            "Invalid value for flag {}: {}\n"
            "Expected Cloud SQL instance in the following format:\n"
            "  projects/my-project/instances/my-instance".format(flag, instance)
        )
  return request


def ValidateCloudFunctionsURIs(unused_ref, args, request):
  """Checks if all provided Cloud Functions URIs are in correct format."""
  flags = [
      "source_cloud_function",
  ]
  function_pattern = re.compile(
      r"projects/(?:[a-z][a-z0-9-\.:]*[a-z0-9])/locations/[-\w]+/functions/[-\w]+"
  )
  for flag in flags:
    if not args.IsSpecified(flag):
      continue

    function = getattr(args, flag)
    if not function_pattern.match(function):
      raise InvalidInputError(
          "Invalid value for flag {}: {}\n"
          "Expected Cloud Function in the following format:\n"
          "  projects/my-project/locations/location/functions/my-function"
          .format(flag, function)
      )
  return request


def ValidateAppEngineVersionsURIs(unused_ref, args, request):
  """Checks if all provided App Engine version URIs are in correct format."""
  flags = [
      "source_app_engine_version",
  ]
  version_pattern = re.compile(
      r"apps/(?:[a-z][a-z0-9-\.:]*[a-z0-9])/services/[-\w]+/versions/[-\w]+"
  )
  for flag in flags:
    if not args.IsSpecified(flag):
      continue

    version = getattr(args, flag)
    if not version_pattern.match(version):
      raise InvalidInputError(
          "Invalid value for flag {}: {}\n"
          "Expected App Engine version in the following format:\n"
          "  apps/my-project/services/my-service/versions/my-version".format(
              flag, version
          )
      )
  return request


def ValidateCloudRunRevisionsURIs(unused_ref, args, request):
  """Checks if all provided Cloud Run revisions URIs are in correct format."""
  flags = [
      "source_cloud_run_revision",
  ]
  revision_pattern = re.compile(
      r"projects/(?:[a-z][a-z0-9-\.:]*[a-z0-9])/locations/[-\w]+/revisions/[-\w]+"
  )
  for flag in flags:
    if not args.IsSpecified(flag):
      continue

    revision = getattr(args, flag)
    if not revision_pattern.match(revision):
      raise InvalidInputError(
          "Invalid value for flag {}: {}\n"
          "Expected Cloud Run revision in the following format:\n"
          "  projects/my-project/locations/location/revisions/my-revision"
          .format(flag, revision)
      )
  return request


def ValidateForwardingRulesURIs(unused_ref, args, request):
  """Checks if all provided forwarding rules URIs are in correct format."""
  flags = [
      "destination_forwarding_rule",
  ]
  forwarding_rule_pattern = re.compile(
      r"projects/(?:[a-z][a-z0-9-\.:]*[a-z0-9])/(global|regions/[-\w]+)/forwardingRules/[-\w]+"
  )
  for flag in flags:
    if not args.IsSpecified(flag):
      continue

    forwarding_rule = getattr(args, flag)
    if not forwarding_rule_pattern.match(forwarding_rule):
      raise InvalidInputError(
          "Invalid value for flag {flag}: {forwarding_rule}\n"
          + "Expected forwarding rule in one of the following formats:\n"
          + "  projects/my-project/global/forwardingRules/my-forwarding-rule\n"
          + "  projects/my-project/regions/us-central1/forwardingRules/my-forwarding-rule"
      )
  return request


def ValidateRedisInstancesURIs(unused_ref, args, request):
  """Checks if all provided Redis Instance URIs are in correct format."""
  flags = ["destination_redis_instance"]
  redis_instance_pattern = re.compile(
      r"projects/(?:[a-z][a-z0-9-\.:]*[a-z0-9])/locations/[-\w]+/instances/[-\w]+"
  )
  for flag in flags:
    if not args.IsSpecified(flag):
      continue

    redis_instance = getattr(args, flag)
    if not redis_instance_pattern.match(redis_instance):
      raise InvalidInputError(
          "Invalid value for flag {flag}: {redis_instance}\n"
          + "Expected Redis Instance in the following format:\n"
          + "  projects/my-project/locations/location/instances/my-redis-instance\n"
      )
  return request


def ValidateRedisClustersURIs(unused_ref, args, request):
  """Checks if all provided Redis Cluster URIs are in correct format."""
  flags = ["destination_redis_cluster"]
  redis_cluster_pattern = re.compile(
      r"projects/(?:[a-z][a-z0-9-\.:]*[a-z0-9])/locations/[-\w]+/clusters/[-\w]+"
  )
  for flag in flags:
    if not args.IsSpecified(flag):
      continue

    redis_cluster = getattr(args, flag)
    if not redis_cluster_pattern.match(redis_cluster):
      raise InvalidInputError(
          "Invalid value for flag {flag}: {redis_cluster}\n"
          + "Expected Redis Cluster in the following format:\n"
          + "  projects/my-project/locations/location/clusters/my-redis-cluster\n"
      )
  return request


def ValidateFqdn(unused_ref, args, request):
  """Checks if destination-gke-master-cluster was specified when destination-fqdn is specified."""
  if args.IsSpecified("destination_fqdn") and not args.IsSpecified(
      "destination_gke_master_cluster"
  ):
    raise InvalidInputError(
        "destination-fqdn can only be specified when"
        " destination-gke-master-cluster is specified\n"
    )
  return request