HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/network_connectivity/util.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for `gcloud network-connectivity`."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import argparse
from typing import Any

from googlecloudsdk.core import exceptions
import googlecloudsdk.generated_clients.apis.networkconnectivity.v1.networkconnectivity_v1_messages as v1
import googlecloudsdk.generated_clients.apis.networkconnectivity.v1beta.networkconnectivity_v1beta_messages as v1beta


# Constants
PROJECTS_RESOURCE_PATH = "projects/"
LOCATION_FILTER_FMT = "location:projects/{0}/locations/{1}"
ROUTE_TYPE_FILTER = "-type:DYNAMIC_ROUTE"


class NetworkConnectivityError(exceptions.Error):
  """Top-level exception for all Network Connectivity errors."""


class InvalidInputError(NetworkConnectivityError):
  """Exception for invalid input."""


# Table format for spokes list
LIST_FORMAT = """
    table(
      name.basename(),
      name.segment(3):label=LOCATION,
      hub.basename(),
      group.basename(),
      format(
        "{0}{1}{2}{3}{4}",
        linkedVpnTunnels.yesno(yes="VPN tunnel", no=""),
        linkedInterconnectAttachments.yesno(yes="VLAN attachment", no=""),
        linkedRouterApplianceInstances.yesno(yes="Router appliance", no=""),
        linkedVpcNetwork.yesno(yes="VPC network", no=""),
        gateway.yesno(yes="Gateway", no="")
      ):label=TYPE,
      firstof(linkedVpnTunnels.uris, linkedInterconnectAttachments.uris, linkedRouterApplianceInstances.instances).len().yesno(no="1"):label="RESOURCE COUNT",
      format(
        "{0}{1}",
        linkedVpcNetwork.yesno(yes="N/A", no=""),
        firstof(linkedVpnTunnels.siteToSiteDataTransfer, linkedInterconnectAttachments.siteToSiteDataTransfer, linkedRouterApplianceInstances.siteToSiteDataTransfer, Gateway).yesno(yes="On", no="")
      ).yesno(no="Off"):label="DATA TRANSFER",
      description
    )
"""

LIST_SPOKES_FORMAT = """
    table(
      name.basename(),
      group.basename(),
      name.segment(1):label=PROJECT,
      name.segment(3):label=LOCATION,
      spokeType:label=TYPE,
      state,
      reasons.code.list():label="STATE REASON",
      etag,
      format(
        "{0}{1}",
        linkedVpcNetwork.yesno(yes="N/A", no=""),
        firstof(linkedVpnTunnels.siteToSiteDataTransfer, linkedInterconnectAttachments.siteToSiteDataTransfer, linkedRouterApplianceInstances.siteToSiteDataTransfer, gateway).yesno(yes="On", no="")
      ).yesno(no="Off").if(view=detailed):label="DATA TRANSFER",
      description.if(view=detailed)
    )
"""


def AppendLocationsGlobalToParent(unused_ref, unused_args, request):
  """Add locations/global to parent path."""

  request.parent += "/locations/global"
  return request


def SetExportPscBeta(unused_ref, args, request):
  """Set legacy export_psc field based on new PSC flags."""
  if not args.IsSpecified("export_psc"):
    # If either of the new flags are specified, set export_psc to true too.
    if (
        args.export_psc_published_services_and_regional_google_apis
        or args.export_psc_global_google_apis
    ):
      request.googleCloudNetworkconnectivityV1betaHub.exportPsc = True
  return request


def DeriveProjectFromResource(resource):
  """Returns the project from a resource string."""
  if PROJECTS_RESOURCE_PATH not in resource:
    raise InvalidInputError(
        "Resource must contain a project path, but received: {0}".format(
            resource
        )
    )
  project = resource[
      resource.index(PROJECTS_RESOURCE_PATH) + len(PROJECTS_RESOURCE_PATH) :
  ]
  project = project.split("/")[0]
  return project


def AppendEffectiveLocationFilter(unused_ref, args, request):
  """Append filter to limit listing dynamic routes at an effective location."""

  if args.IsSpecified("effective_location"):
    location = args.effective_location
    project = DeriveProjectFromResource(request.parent)
    location_filter = LOCATION_FILTER_FMT.format(project, location)
    request.filter = "{0} OR {1}".format(location_filter, ROUTE_TYPE_FILTER)
  return request


def SetGlobalLocation():
  """Set default location to global."""
  return "global"


def ClearOverlaps(unused_ref, args, patch_request):
  """Handles clear_overlaps flag."""

  if args.IsSpecified("clear_overlaps"):
    if patch_request.updateMask:
      patch_request.updateMask += ",overlaps"
    else:
      patch_request.updateMask = "overlaps"
  return patch_request


def ClearLabels(unused_ref, args, patch_request):
  """Handles clear_labels flag."""

  if args.IsSpecified("clear_labels"):
    if patch_request.updateMask:
      patch_request.updateMask += ",labels"
    else:
      patch_request.updateMask = "labels"
  return patch_request


def ValidateMigration(unused_ref, args, request):
  """Validates internal range migration parameters."""
  if not args.IsSpecified("usage") or args.usage != "for-migration":
    if args.IsSpecified("migration_source") or args.IsSpecified(
        "migration_target"
    ):
      raise InvalidInputError(
          "migration_source and migration_target can only be specified when"
          " usage is set to for-migration."
      )
    else:
      return request
  # We are now in the for-migration usage case.
  if not args.IsSpecified("migration_source") or not args.IsSpecified(
      "migration_target"
  ):
    raise InvalidInputError(
        "Both migration_source and migration_target must be specified."
    )
  if args.IsSpecified("peering") and args.peering != "for-self":
    raise InvalidInputError(
        "Peering must be set to for-self when usage is set to for-migration."
    )
  if not args.migration_source:
    raise InvalidInputError("migration_source cannot be empty.")
  if not args.migration_target:
    raise InvalidInputError("migration_target cannot be empty.")
  if args.migration_source == args.migration_target:
    raise InvalidInputError(
        "migration_source and migration_target cannot be the same."
    )
  return request


def ValidateAllocationOptions(ref: Any, args: argparse.Namespace, request: Any):
  """Validates internal range allocation options."""

  del ref  # Unused.
  if (
      args.IsSpecified("allocation_strategy")
      and args.allocation_strategy == "RANDOM_FIRST_N_AVAILABLE"
  ):
    if (
        not args.IsSpecified("first_available_ranges_lookup_size")
        or args.first_available_ranges_lookup_size < 1
    ):
      raise InvalidInputError(
          "first_available_ranges_lookup_size must be set and greater than 0"
          " when allocation_strategy is RANDOM_FIRST_N_AVAILABLE."
      )
  elif args.IsSpecified("first_available_ranges_lookup_size"):
    raise InvalidInputError(
        "first_available_ranges_lookup_size can only be set when"
        " allocation_strategy is RANDOM_FIRST_N_AVAILABLE."
    )
  return request


class StoreGlobalAction(argparse._StoreConstAction):
  # pylint: disable=protected-access
  # pylint: disable=redefined-builtin
  """Return "global" if the --global argument is used."""

  def __init__(
      self, option_strings, dest, default="", required=False, help=None
  ):
    super(StoreGlobalAction, self).__init__(
        option_strings=option_strings,
        dest=dest,
        const="global",
        default=default,
        required=required,
        help=help,
    )


def SetGatewayAdvertisedRouteRecipient(unused_ref, args, request):
  """Set the route's `recipient` field based on boolean flags.

  Args:
    args: The command arguments.
    request: The request to set the `recipient` field on.

  Returns:
    The request with the `recipient` field set.
  """
  if args.advertise_to_hub:
    request.googleCloudNetworkconnectivityV1betaGatewayAdvertisedRoute.recipient = (
        v1beta.GoogleCloudNetworkconnectivityV1betaGatewayAdvertisedRoute.RecipientValueValuesEnum.ADVERTISE_TO_HUB
    )
  return request


def CheckRegionSpecifiedIfSpokeSpecified(unused_ref, unused_args, request):
  """If a spoke name is specified, then its region must also be specified.

  This is because CCFE doesn't support a wildcard ("-") in this case but returns
  a confusing error message. So we give the user a friendlier error.

  Args:
    request: The request object. We will inspect the parent field.

  Returns:
    The unmodified request object.
  Raises:
    InvalidInputError: If the region is unspecified when a spoke is.
  """
  region_wildcard = "/locations/-/" in request.parent
  spoke_wildcard = request.parent.endswith("/spokes/-")
  if region_wildcard and not spoke_wildcard:
    raise InvalidInputError(
        "A region must be specified if a spoke name is specified"
    )
  return request


def CheckForRouteTableAndHubWildcardMismatch(unused_ref, unused_args, request):
  """Check that hub and route table are both specified or both unspecified.

  This is because CCFE doesn't support wildcards ("-") in this case but returns
  a confusing error message. So we give he user a friendlier error.

  Args:
   request: The request object.

  Returns:
    The unmodified request object.
  Raises:
    InvalidInputError: If the user needs to specify a hub name or route table
    name.
  """
  hub_wildcard = "/hubs/-/" in request.parent
  route_table_wildcard = request.parent.endswith("/routeTables/-")
  if hub_wildcard and not route_table_wildcard:
    raise InvalidInputError(
        "A hub must be specified if a route table is specified"
    )
  if route_table_wildcard and not hub_wildcard:
    raise InvalidInputError(
        "A route table must be specified if a hub is specified"
    )
  return request


def ProhibitHybridInspection(unused_ref, unused_args, request):
  """Reject requests with HYBRID_INSPECTION preset topology.

  Args:
    request: A CreateHubRequest object.

  Returns:
    The unmodified request object.
  Raises:
    InvalidInputError: If the CreateHubRequest has the HYBRID_INSPECTION preset
    topology.
  """
  if not hasattr(request.hub, "presetTopology"):
    return request
  if (request.hub.presetTopology !=
      v1.Hub.PresetTopologyValueValuesEnum.HYBRID_INSPECTION):
    return request

  raise InvalidInputError(
      "HYBRID_INSPECTION unsupported in the GA component; "
      "use the beta component instead. "
      "See https://cloud.google.com/sdk/gcloud#release_levels"
  )