HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/surface/compute/url_maps/import.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Import URL maps command."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import exceptions as apitools_exceptions
from googlecloudsdk.api_lib.compute import base_classes
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.compute import exceptions as compute_exceptions
from googlecloudsdk.command_lib.compute import flags as compute_flags
from googlecloudsdk.command_lib.compute import scope as compute_scope
from googlecloudsdk.command_lib.compute.url_maps import flags
from googlecloudsdk.command_lib.compute.url_maps import url_maps_utils
from googlecloudsdk.command_lib.export import util as export_util
from googlecloudsdk.core import log
from googlecloudsdk.core import yaml_validator
from googlecloudsdk.core.console import console_io


def _DetailedHelp():
  return {
      'brief':
          'Import a URL map.',
      'DESCRIPTION':
          """\
          Imports a URL map's configuration from a file.
          """,
      'EXAMPLES':
          """\
          A URL map can be imported by running:

            $ {command} NAME --source=<path-to-file>
          """
  }


def _GetApiVersion(release_track):
  """Returns the API version based on the release track."""
  if release_track == base.ReleaseTrack.ALPHA:
    return 'alpha'
  elif release_track == base.ReleaseTrack.BETA:
    return 'beta'
  return 'v1'


def _GetSchemaPath(release_track, for_help=False):
  """Returns the resource schema path."""
  return export_util.GetSchemaPath(
      'compute', _GetApiVersion(release_track), 'UrlMap', for_help=for_help)


def _SendPatchRequest(client, resources, url_map_ref, replacement):
  """Sends a URL map patch request and waits for the operation to finish.

  Args:
    client: The API client.
    resources: The resource parser.
    url_map_ref: The URL map reference.
    replacement: The URL map to patch with.

  Returns:
    The operation result.
  """
  if url_map_ref.Collection() == 'compute.regionUrlMaps':
    service = client.apitools_client.regionUrlMaps
    operation = client.apitools_client.regionUrlMaps.Patch(
        client.messages.ComputeRegionUrlMapsPatchRequest(
            project=url_map_ref.project,
            region=url_map_ref.region,
            urlMap=url_map_ref.Name(),
            urlMapResource=replacement))
  else:
    service = client.apitools_client.urlMaps
    operation = client.apitools_client.urlMaps.Patch(
        client.messages.ComputeUrlMapsPatchRequest(
            project=url_map_ref.project,
            urlMap=url_map_ref.Name(),
            urlMapResource=replacement))

  return url_maps_utils.WaitForOperation(resources, service, operation,
                                         url_map_ref, 'Updating URL map')


def _SendInsertRequest(client, resources, url_map_ref, url_map):
  """Sends a URL map insert request and waits for the operation to finish.

  Args:
    client: The API client.
    resources: The resource parser.
    url_map_ref: The URL map reference.
    url_map: The URL map to insert.

  Returns:
    The operation result.
  """
  if url_map_ref.Collection() == 'compute.regionUrlMaps':
    service = client.apitools_client.regionUrlMaps
    operation = client.apitools_client.regionUrlMaps.Insert(
        client.messages.ComputeRegionUrlMapsInsertRequest(
            project=url_map_ref.project,
            region=url_map_ref.region,
            urlMap=url_map))
  else:
    service = client.apitools_client.urlMaps
    operation = client.apitools_client.urlMaps.Insert(
        client.messages.ComputeUrlMapsInsertRequest(
            project=url_map_ref.project, urlMap=url_map))

  return url_maps_utils.WaitForOperation(resources, service, operation,
                                         url_map_ref, 'Creating URL map')


def _GetClearedFieldsForDuration(duration, field_prefix):
  """Gets a list of fields cleared by the user for Duration."""
  cleared_fields = []
  if hasattr(duration, 'seconds'):
    cleared_fields.append(field_prefix + 'seconds')
  if hasattr(duration, 'nanos'):
    cleared_fields.append(field_prefix + 'nanos')
  return cleared_fields


def _GetClearedFieldsForUrlRewrite(url_rewrite, field_prefix):
  """Gets a list of fields cleared by the user for UrlRewrite."""
  cleared_fields = []
  if not url_rewrite.pathPrefixRewrite:
    cleared_fields.append(field_prefix + 'pathPrefixRewrite')
  if not url_rewrite.hostRewrite:
    cleared_fields.append(field_prefix + 'hostRewrite')
  return cleared_fields


def _GetClearedFieldsForRetryPolicy(retry_policy, field_prefix):
  """Gets a list of fields cleared by the user for RetryPolicy."""
  cleared_fields = []
  if not retry_policy.retryConditions:
    cleared_fields.append(field_prefix + 'retryConditions')
  if hasattr(retry_policy, 'numRetries'):
    cleared_fields.append(field_prefix + 'numRetries')
  if not retry_policy.perTryTimeout:
    cleared_fields.append(field_prefix + 'perTryTimeout')
  else:
    cleared_fields = cleared_fields + _GetClearedFieldsForDuration(
        retry_policy.perTryTimeout, field_prefix + 'perTryTimeout.')
  return cleared_fields


def _GetClearedFieldsForRequestMirrorPolicy(mirror_policy, field_prefix):
  """Gets a list of fields cleared by the user for RequestMirrorPolicy."""
  cleared_fields = []
  if not mirror_policy.backendService:
    cleared_fields.append(field_prefix + 'backendService')
  return cleared_fields


def _GetClearedFieldsForCorsPolicy(cors_policy, field_prefix):
  """Gets a list of fields cleared by the user for CorsPolicy."""
  cleared_fields = []
  if not cors_policy.allowOrigins:
    cleared_fields.append(field_prefix + 'allowOrigins')
  if not cors_policy.allowOriginRegexes:
    cleared_fields.append(field_prefix + 'allowOriginRegexes')
  if not cors_policy.allowMethods:
    cleared_fields.append(field_prefix + 'allowMethods')
  if not cors_policy.allowHeaders:
    cleared_fields.append(field_prefix + 'allowHeaders')
  if not cors_policy.exposeHeaders:
    cleared_fields.append(field_prefix + 'exposeHeaders')
  if not cors_policy.maxAge:
    cleared_fields.append(field_prefix + 'maxAge')
  if not cors_policy.allowCredentials:
    cleared_fields.append(field_prefix + 'allowCredentials')
  if not cors_policy.disabled:
    cleared_fields.append(field_prefix + 'disabled')
  return cleared_fields


def _GetClearedFieldsForFaultDelay(fault_delay, field_prefix):
  """Gets a list of fields cleared by the user for HttpFaultDelay."""
  cleared_fields = []
  if not fault_delay.fixedDelay:
    cleared_fields.append(field_prefix + 'fixedDelay')
  else:
    cleared_fields = cleared_fields + _GetClearedFieldsForDuration(
        fault_delay.fixedDelay, field_prefix + 'fixedDelay.')
  if not fault_delay.percentage:
    cleared_fields.append(field_prefix + 'percentage')
  return cleared_fields


def _GetClearedFieldsForFaultAbort(fault_abort, field_prefix):
  """Gets a list of fields cleared by the user for HttpFaultAbort."""
  cleared_fields = []
  if not fault_abort.httpStatus:
    cleared_fields.append(field_prefix + 'httpStatus')
  if not fault_abort.percentage:
    cleared_fields.append(field_prefix + 'percentage')
  return cleared_fields


def _GetClearedFieldsForFaultInjectionPolicy(fault_injection_policy,
                                             field_prefix):
  """Gets a list of fields cleared by the user for FaultInjectionPolicy."""
  cleared_fields = []
  if not fault_injection_policy.delay:
    cleared_fields.append(field_prefix + 'delay')
  else:
    cleared_fields = cleared_fields + _GetClearedFieldsForFaultDelay(
        fault_injection_policy.delay, field_prefix + 'delay.')
  if not fault_injection_policy.abort:
    cleared_fields.append(field_prefix + 'abort')
  else:
    cleared_fields = cleared_fields + _GetClearedFieldsForFaultAbort(
        fault_injection_policy.abort, field_prefix + 'abort.')
  return cleared_fields


def _GetClearedFieldsForRoutAction(route_action, field_prefix):
  """Gets a list of fields cleared by the user for HttpRouteAction."""
  cleared_fields = []
  if not route_action.weightedBackendServices:
    cleared_fields.append(field_prefix + 'weightedBackendServices')
  if not route_action.urlRewrite:
    cleared_fields.append(field_prefix + 'urlRewrite')
  else:
    cleared_fields = cleared_fields + _GetClearedFieldsForUrlRewrite(
        route_action.urlRewrite, field_prefix + 'urlRewrite.')
  if not route_action.timeout:
    cleared_fields.append(field_prefix + 'timeout')
  else:
    cleared_fields = cleared_fields + _GetClearedFieldsForDuration(
        route_action.timeout, field_prefix + 'timeout.')
  if not route_action.retryPolicy:
    cleared_fields.append(field_prefix + 'retryPolicy')
  else:
    cleared_fields = cleared_fields + _GetClearedFieldsForRetryPolicy(
        route_action.retryPolicy, field_prefix + 'retryPolicy.')
  if not route_action.requestMirrorPolicy:
    cleared_fields.append(field_prefix + 'requestMirrorPolicy')
  else:
    cleared_fields = cleared_fields + _GetClearedFieldsForRequestMirrorPolicy(
        route_action.requestMirrorPolicy, field_prefix + 'requestMirrorPolicy.')
  if not route_action.corsPolicy:
    cleared_fields.append(field_prefix + 'corsPolicy')
  else:
    cleared_fields = cleared_fields + _GetClearedFieldsForCorsPolicy(
        route_action.corsPolicy, field_prefix + 'corsPolicy.')
  if not route_action.faultInjectionPolicy:
    cleared_fields.append(field_prefix + 'faultInjectionPolicy')
  else:
    cleared_fields = cleared_fields + _GetClearedFieldsForFaultInjectionPolicy(
        route_action.faultInjectionPolicy,
        field_prefix + 'faultInjectionPolicy.')
  return cleared_fields


def _GetClearedFieldsForCustomErrorResponsePolicy(
    custom_error_response_policy, field_prefix
):
  """Gets a list of fields cleared by the user for CustomErrorResponsePolicy."""
  cleared_fields = []
  if not custom_error_response_policy.errorResponseRules:
    cleared_fields.append(field_prefix + 'errorResponseRules')
  if not custom_error_response_policy.errorService:
    cleared_fields.append(field_prefix + 'errorService')
  return cleared_fields


def _GetClearedFieldsForUrlRedirect(url_redirect, field_prefix):
  """Gets a list of fields cleared by the user for UrlRedirect."""
  cleared_fields = []
  if not url_redirect.hostRedirect:
    cleared_fields.append(field_prefix + 'hostRedirect')
  if not url_redirect.pathRedirect:
    cleared_fields.append(field_prefix + 'pathRedirect')
  if not url_redirect.prefixRedirect:
    cleared_fields.append(field_prefix + 'prefixRedirect')
  if not url_redirect.redirectResponseCode:
    cleared_fields.append(field_prefix + 'redirectResponseCode')
  if not url_redirect.httpsRedirect:
    cleared_fields.append(field_prefix + 'httpsRedirect')
  if not url_redirect.stripQuery:
    cleared_fields.append(field_prefix + 'stripQuery')
  return cleared_fields


def _GetClearedFieldsForHeaderAction(header_action, field_prefix):
  """Gets a list of fields cleared by the user for HeaderAction."""
  cleared_fields = []
  if not header_action.requestHeadersToRemove:
    cleared_fields.append(field_prefix + 'requestHeadersToRemove')
  if not header_action.requestHeadersToAdd:
    cleared_fields.append(field_prefix + 'requestHeadersToAdd')
  if not header_action.responseHeadersToRemove:
    cleared_fields.append(field_prefix + 'responseHeadersToRemove')
  if not header_action.responseHeadersToAdd:
    cleared_fields.append(field_prefix + 'responseHeadersToAdd')
  return cleared_fields


def _Run(args, holder, url_map_arg, release_track):
  """Issues requests necessary to import URL maps."""
  client = holder.client
  resources = holder.resources

  url_map_ref = url_map_arg.ResolveAsResource(
      args,
      resources,
      default_scope=compute_scope.ScopeEnum.GLOBAL,
      scope_lister=compute_flags.GetDefaultScopeLister(client))

  data = console_io.ReadFromFileOrStdin(args.source or '-', binary=False)

  try:
    url_map = export_util.Import(
        message_type=client.messages.UrlMap,
        stream=data,
        schema_path=_GetSchemaPath(release_track))
  except yaml_validator.ValidationError as e:
    raise compute_exceptions.ValidationError(str(e))

  if url_map.name != url_map_ref.Name():
    # Replace warning and raise error after 10/01/2021
    log.warning('The name of the Url Map must match the value of the ' +
                '\'name\' attribute in the YAML file. Future versions of ' +
                'gcloud will fail with an error.')
  # Get existing URL map.
  try:
    url_map_old = url_maps_utils.SendGetRequest(client, url_map_ref)
  except apitools_exceptions.HttpError as error:
    if error.status_code != 404:
      raise error
    # Url Map does not exist, create a new one.
    return _SendInsertRequest(client, resources, url_map_ref, url_map)

  # No change, do not send requests to server.
  if url_map_old == url_map:
    return

  console_io.PromptContinue(
      message=('Url Map [{0}] will be overwritten.').format(url_map_ref.Name()),
      cancel_on_no=True)

  # Populate id and fingerprint fields when YAML files don't contain them.
  if not url_map.id:
    url_map.id = url_map_old.id
  if url_map.fingerprint:
    # Replace warning and raise error after 10/01/2021
    log.warning('An up-to-date fingerprint must be provided to ' +
                'update the Url Map. Future versions of gcloud will fail ' +
                'with an error \'412 conditionNotMet\'')
    url_map.fingerprint = url_map_old.fingerprint
  # Unspecified fields are assumed to be cleared.
  # TODO(b/182287403) Replace with proto reflection and update scenario tests.
  cleared_fields = []
  if not url_map.description:
    cleared_fields.append('description')
  if not url_map.hostRules:
    cleared_fields.append('hostRules')
  if not url_map.pathMatchers:
    cleared_fields.append('pathMatchers')
  if not url_map.tests:
    cleared_fields.append('tests')
  if not url_map.defaultService:
    cleared_fields.append('defaultService')
  if not url_map.defaultCustomErrorResponsePolicy:
    cleared_fields.append('defaultCustomErrorResponsePolicy')
  else:
    cleared_fields = (
        cleared_fields
        + _GetClearedFieldsForCustomErrorResponsePolicy(
            url_map.defaultCustomErrorResponsePolicy,
            'defaultCustomErrorResponsePolicy.',
        )
    )
  if not url_map.defaultRouteAction:
    cleared_fields.append('defaultRouteAction')
  else:
    cleared_fields = cleared_fields + _GetClearedFieldsForRoutAction(
        url_map.defaultRouteAction, 'defaultRouteAction.')
  if not url_map.defaultUrlRedirect:
    cleared_fields.append('defaultUrlRedirect')
  else:
    cleared_fields = cleared_fields + _GetClearedFieldsForUrlRedirect(
        url_map.defaultUrlRedirect, 'defaultUrlRedirect.')
  if not url_map.headerAction:
    cleared_fields.append('headerAction')
  else:
    cleared_fields = cleared_fields + _GetClearedFieldsForHeaderAction(
        url_map.headerAction, 'headerAction.')

  with client.apitools_client.IncludeFields(cleared_fields):
    return _SendPatchRequest(client, resources, url_map_ref, url_map)


@base.ReleaseTracks(
    base.ReleaseTrack.GA, base.ReleaseTrack.BETA, base.ReleaseTrack.ALPHA
)
@base.UniverseCompatible
class Import(base.UpdateCommand):
  """Import a URL map."""

  detailed_help = _DetailedHelp()
  URL_MAP_ARG = None

  @classmethod
  def Args(cls, parser):
    cls.URL_MAP_ARG = flags.UrlMapArgument()
    cls.URL_MAP_ARG.AddArgument(parser, operation_type='import')
    export_util.AddImportFlags(
        parser, _GetSchemaPath(cls.ReleaseTrack(), for_help=True))

  def Run(self, args):
    holder = base_classes.ComputeApiHolder(self.ReleaseTrack())
    return _Run(args, holder, self.URL_MAP_ARG, self.ReleaseTrack())