HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/run/resource_args.py
# -*- coding: utf-8 -*- #
# Copyright 2018 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Shared resource flags for Cloud Run commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import abc
import copy
import os
import re

from googlecloudsdk.api_lib.run import global_methods
from googlecloudsdk.calliope.concepts import concepts
from googlecloudsdk.calliope.concepts import deps
from googlecloudsdk.calliope.concepts import util as concepts_util
from googlecloudsdk.command_lib.run import exceptions
from googlecloudsdk.command_lib.run import platforms
from googlecloudsdk.command_lib.util.concepts import presentation_specs
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources
from googlecloudsdk.core.console import console_io


class PromptFallthrough(deps.Fallthrough):
  """Fall through to reading from an interactive prompt."""

  def __init__(self, hint):
    super(PromptFallthrough, self).__init__(function=None, hint=hint)

  @abc.abstractmethod
  def _Prompt(self, parsed_args):
    pass

  def _Call(self, parsed_args):
    if not console_io.CanPrompt():
      return None
    return self._Prompt(parsed_args)


def _GenerateServiceName(image):
  """Produce a valid default service name from a container image path.

  Converts a file path or image path into a reasonable default service name by
  stripping file path delimeters, image tags, and image hashes.
  For example, the image name 'gcr.io/myproject/myimage:latest' would produce
  the service name 'myimage'.

  Args:
    image: str, The container path.

  Returns:
    A valid Cloud Run service name.
  """
  base_name = os.path.basename(image.rstrip(os.sep))
  base_name = base_name.split(':')[0]  # Discard image tag if present.
  base_name = base_name.split('@')[0]  # Disacard image hash if present.
  # Remove non-supported special characters.
  return re.sub(r'[^a-zA-Z0-9-]', '', base_name).strip('-').lower()


def _GenerateServiceNameFromLocalPath(source):
  """Produce a valid default service name from a local file or directory path.

  Converts a file or directory path into a reasonable default service name by
  resolving relative paths to absolute paths, removing any extensions, and then
  removing any invalid characters.

  For example, the paths /tmp/foo/bar/.. and /tmp/foo.tar.gz would both produce
  the service name 'foo'. A source path of "." will be expanded to the current
  directory name."

  Args:
    source: str, The file or directory path.

  Returns:
    A valid Cloud Run service name.
  """
  path, ext = os.path.splitext(os.path.abspath(source))
  while ext:
    path, ext = os.path.splitext(path)
  return _GenerateServiceName(path)


class ResourcePromptFallthrough(PromptFallthrough):
  """Fall through to reading the resource name from an interactive prompt."""

  def __init__(self, resource_type_lower):
    super(ResourcePromptFallthrough, self).__init__(
        'specify the {} name from an interactive prompt'.format(
            resource_type_lower
        )
    )
    self.resource_type_lower = resource_type_lower

  def _Prompt(self, parsed_args):
    message = self.resource_type_lower.capitalize() + ' name'
    default_name = self._DefaultNameFromArgs(parsed_args)
    return console_io.PromptWithDefault(message=message, default=default_name)

  def _DefaultNameFromArgs(self, parsed_args):
    if getattr(parsed_args, 'image', None):
      return _GenerateServiceName(parsed_args.image)
    elif getattr(parsed_args, 'source', None):
      return _GenerateServiceNameFromLocalPath(parsed_args.source)
    return ''


class RegionPromptFallthrough(PromptFallthrough):
  """Fall through to reading the region from an interactive prompt."""

  def __init__(self):
    super(RegionPromptFallthrough, self).__init__(
        'specify the region from an interactive prompt'
    )

  def _Prompt(self, parsed_args):
    client = global_methods.GetServerlessClientInstance()
    all_regions = global_methods.ListRegions(client)
    idx = console_io.PromptChoice(
        all_regions,
        message='Please specify a region:\n',
        cancel_option=True,
        allow_freeform=True,
    )
    region = all_regions[idx]
    log.status.Print(
        'To make this the default region, run '
        '`gcloud config set run/region {}`.\n'.format(region)
    )
    if region:
      parsed_args.region = region
    return region


class ServicePromptFallthrough(ResourcePromptFallthrough):

  def __init__(self):
    super(ServicePromptFallthrough, self).__init__('service')


class WorkerPoolPromptFallthrough(ResourcePromptFallthrough):

  def __init__(self):
    super(WorkerPoolPromptFallthrough, self).__init__('workerpool')


class JobPromptFallthrough(ResourcePromptFallthrough):

  def __init__(self):
    super(JobPromptFallthrough, self).__init__('job')


class ExecutionPromptFallthrough(ResourcePromptFallthrough):

  def __init__(self):
    super(ExecutionPromptFallthrough, self).__init__('execution')


class DefaultFallthrough(deps.Fallthrough):
  """Use the namespace "default".

  For Knative only.

  For Cloud Run, raises an ArgumentError if project not set.
  """

  def __init__(self):
    super(DefaultFallthrough, self).__init__(
        function=None,
        hint=(
            'For Cloud Run on Kubernetes Engine, defaults to "default". '
            'Otherwise, defaults to project ID.'
        ),
    )

  def _Call(self, parsed_args):
    if (
        platforms.GetPlatform() == platforms.PLATFORM_GKE
        or platforms.GetPlatform() == platforms.PLATFORM_KUBERNETES
    ):
      return 'default'
    elif not (
        getattr(parsed_args, 'project', None)
        or properties.VALUES.core.project.Get()
    ):
      # HACK: Compensate for how "namespace" is actually "project" in Cloud Run
      # by providing an error message explicitly early here.
      raise exceptions.ArgumentError(
          'The [project] resource is not properly specified. '
          'Please specify the argument [--project] on the command line or '
          'set the property [core/project].'
      )
    return None


def NamespaceAttributeConfig():
  return concepts.ResourceParameterAttributeConfig(
      name='namespace',
      help_text=(
          'Specific to Cloud Run for Anthos: '
          'Kubernetes namespace for the {resource}.'
      ),
      fallthroughs=[
          deps.PropertyFallthrough(properties.VALUES.run.namespace),
          DefaultFallthrough(),
          deps.ArgFallthrough('project'),
          deps.PropertyFallthrough(properties.VALUES.core.project),
      ],
  )


def ProjectAttributeConfig():
  project_attribute_config = copy.deepcopy(
      concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG
  )
  fallthroughs = [
      DefaultFallthrough()
  ] + concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG.fallthroughs
  project_attribute_config.fallthroughs = fallthroughs
  return project_attribute_config


def ServiceAttributeConfig(prompt=False):
  """Attribute config with fallthrough prompt only if requested."""
  if prompt:
    fallthroughs = [ServicePromptFallthrough()]
  else:
    fallthroughs = []
  return concepts.ResourceParameterAttributeConfig(
      name='service',
      help_text='Service for the {resource}.',
      fallthroughs=fallthroughs,
  )


def WorkerPoolAttributeConfig(prompt=False):
  """Attribute config with fallthrough prompt only if requested."""
  if prompt:
    fallthroughs = [WorkerPoolPromptFallthrough()]
  else:
    fallthroughs = []
  return concepts.ResourceParameterAttributeConfig(
      name='worker-pool',
      help_text='WorkerPool for the {resource}.',
      fallthroughs=fallthroughs,
  )


def ConfigurationAttributeConfig():
  return concepts.ResourceParameterAttributeConfig(
      name='configuration', help_text='Configuration for the {resource}.'
  )


def RouteAttributeConfig():
  return concepts.ResourceParameterAttributeConfig(
      name='route', help_text='Route for the {resource}.'
  )


def RevisionAttributeConfig():
  return concepts.ResourceParameterAttributeConfig(
      name='revision', help_text='Revision for the {resource}.'
  )


def DomainAttributeConfig():
  return concepts.ResourceParameterAttributeConfig(
      name='domain', help_text='Name of the domain to be mapped to.'
  )


def JobAttributeConfig(prompt=False):
  if prompt:
    fallthroughs = [JobPromptFallthrough()]
  else:
    fallthroughs = []
  return concepts.ResourceParameterAttributeConfig(
      name='jobs',
      help_text='Job for the {resource}.',
      fallthroughs=fallthroughs,
  )


def ExecutionAttributeConfig(prompt=False):
  if prompt:
    fallthroughs = [ExecutionPromptFallthrough()]
  else:
    fallthroughs = []
  return concepts.ResourceParameterAttributeConfig(
      name='executions', help_text='Execution.', fallthroughs=fallthroughs
  )


class TaskExecutionAndIndexFallthrough(deps.ArgFallthrough):
  """Allow the user to provide --execution and --index to find a task."""

  def __init__(self, arg_name, plural=False):
    super(TaskExecutionAndIndexFallthrough, self).__init__(
        'provide the arguments `{}`  and `index` on the command line'.format(
            arg_name
        ),
        active=True,
        plural=plural,
    )
    self.arg_name = arg_name

  def _Call(self, parsed_args):
    prefix = getattr(
        parsed_args, concepts_util.NamespaceFormat(self.arg_name), None
    )
    index = getattr(parsed_args, 'index', None)
    return '{}-{}'.format(prefix, index)


def TaskAttributeConfig(prompt=False):
  if prompt:
    fallthroughs = [TaskExecutionAndIndexFallthrough('task')]
  else:
    fallthroughs = []
  return concepts.ResourceParameterAttributeConfig(
      name='tasks', help_text='Task.', fallthroughs=fallthroughs
  )


def LocationAttributeConfig():
  return concepts.ResourceParameterAttributeConfig(
      name='region',
      help_text=(
          'The Cloud region for the {resource}. Overrides the default '
          '`run/region` property value for this command invocation.'
      ),
      fallthroughs=[
          deps.ArgFallthrough('--region'),
          deps.PropertyFallthrough(properties.VALUES.run.region),
          RegionPromptFallthrough(),
      ],
  )


class ClusterPromptFallthrough(PromptFallthrough):
  """Fall through to reading the cluster name from an interactive prompt."""

  def __init__(self):
    super(ClusterPromptFallthrough, self).__init__(
        'specify the cluster from a list of available clusters'
    )

  def _Prompt(self, parsed_args):
    """Fallthrough to reading the cluster name from an interactive prompt.

    Only prompt for cluster name if the user-specified platform is GKE.

    Args:
      parsed_args: Namespace, the args namespace.

    Returns:
      A cluster name string
    """
    if platforms.GetPlatform() != platforms.PLATFORM_GKE:
      return

    project = properties.VALUES.core.project.Get(required=True)
    cluster_location = (
        getattr(parsed_args, 'cluster_location', None)
        or properties.VALUES.run.cluster_location.Get()
    )
    cluster_location_msg = (
        ' in [{}]'.format(cluster_location) if cluster_location else ''
    )

    cluster_refs = global_methods.MultiTenantClustersForProject(
        project, cluster_location
    )
    if not cluster_refs:
      raise exceptions.ConfigurationError(
          'No compatible clusters found{}. '
          'Ensure your cluster has Cloud Run enabled.'.format(
              cluster_location_msg
          )
      )

    cluster_refs_descs = [
        self._GetClusterDescription(c, cluster_location, project)
        for c in cluster_refs
    ]

    idx = console_io.PromptChoice(
        cluster_refs_descs,
        message='GKE cluster{}:'.format(cluster_location_msg),
        cancel_option=True,
    )

    cluster_ref = cluster_refs[idx]

    if cluster_location:
      location_help_text = ''
    else:
      location_help_text = (
          ' && gcloud config set run/cluster_location {}'.format(
              cluster_ref.zone
          )
      )

    cluster_name = cluster_ref.Name()

    if cluster_ref.projectId != project:
      cluster_name = cluster_ref.RelativeName()
      location_help_text = ''

    log.status.Print(
        'To make this the default cluster, run '
        '`gcloud config set run/cluster {cluster}'
        '{location}`.\n'.format(
            cluster=cluster_name, location=location_help_text
        )
    )
    return cluster_ref.SelfLink()

  def _GetClusterDescription(self, cluster, cluster_location, project):
    """Description of cluster for prompt."""

    response = cluster.Name()
    if not cluster_location:
      response = '{} in {}'.format(response, cluster.zone)
    if project != cluster.projectId:
      response = '{} in {}'.format(response, cluster.projectId)

    return response


def ClusterAttributeConfig():
  return concepts.ResourceParameterAttributeConfig(
      name='cluster',
      help_text=(
          'Name of the Kubernetes Engine cluster to use. '
          'Alternatively, set the property [run/cluster].'
      ),
      fallthroughs=[
          deps.PropertyFallthrough(properties.VALUES.run.cluster),
          ClusterPromptFallthrough(),
      ],
  )


class ClusterLocationPromptFallthrough(PromptFallthrough):
  """Fall through to reading the cluster name from an interactive prompt."""

  def __init__(self):
    super(ClusterLocationPromptFallthrough, self).__init__(
        'specify the cluster location from a list of available zones'
    )

  def _Prompt(self, parsed_args):
    """Fallthrough to reading the cluster location from an interactive prompt.

    Only prompt for cluster location if the user-specified platform is GKE
    and if cluster name is already defined.

    Args:
      parsed_args: Namespace, the args namespace.

    Returns:
      A cluster location string
    """
    cluster_name = (
        getattr(parsed_args, 'cluster', None)
        or properties.VALUES.run.cluster.Get()
    )
    if platforms.GetPlatform() == platforms.PLATFORM_GKE and cluster_name:
      clusters = [
          c for c in global_methods.ListClusters() if c.name == cluster_name
      ]
      if not clusters:
        raise exceptions.ConfigurationError(
            'No cluster locations found for cluster [{}]. '
            'Ensure your clusters have Cloud Run enabled.'.format(cluster_name)
        )
      cluster_locations = [c.zone for c in clusters]
      idx = console_io.PromptChoice(
          cluster_locations,
          message='GKE cluster location for [{}]:'.format(cluster_name),
          cancel_option=True,
      )
      location = cluster_locations[idx]
      log.status.Print(
          'To make this the default cluster location, run '
          '`gcloud config set run/cluster_location {}`.\n'.format(location)
      )
      return location


def ClusterLocationAttributeConfig():
  return concepts.ResourceParameterAttributeConfig(
      name='location',
      help_text=(
          'Zone in which the {resource} is located. '
          'Alternatively, set the property [run/cluster_location].'
      ),
      fallthroughs=[
          deps.PropertyFallthrough(properties.VALUES.run.cluster_location),
          ClusterLocationPromptFallthrough(),
      ],
  )


def GetClusterResourceSpec():
  return concepts.ResourceSpec(
      'container.projects.zones.clusters',
      projectId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      zone=ClusterLocationAttributeConfig(),
      clusterId=ClusterAttributeConfig(),
      resource_name='cluster',
  )


def GetServiceResourceSpec(prompt=False):
  return concepts.ResourceSpec(
      'run.namespaces.services',
      namespacesId=NamespaceAttributeConfig(),
      servicesId=ServiceAttributeConfig(prompt),
      resource_name='service',
  )


def GetConfigurationResourceSpec():
  return concepts.ResourceSpec(
      'run.namespaces.configurations',
      namespacesId=NamespaceAttributeConfig(),
      configurationsId=ConfigurationAttributeConfig(),
      resource_name='configuration',
  )


def GetRouteResourceSpec():
  return concepts.ResourceSpec(
      'run.namespaces.routes',
      namespacesId=NamespaceAttributeConfig(),
      routesId=RouteAttributeConfig(),
      resource_name='route',
  )


# For Worker Pool revisions, we don't use the namespace attribute.
def GetRevisionResourceSpec(is_worker_pool_revision=False):
  return concepts.ResourceSpec(
      'run.namespaces.revisions',
      namespacesId=NamespaceAttributeConfig()
      if not is_worker_pool_revision
      else concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      revisionsId=RevisionAttributeConfig(),
      resource_name='revision',
  )


def GetDomainMappingResourceSpec():
  return concepts.ResourceSpec(
      'run.namespaces.domainmappings',
      namespacesId=NamespaceAttributeConfig(),
      domainmappingsId=DomainAttributeConfig(),
      resource_name='DomainMapping',
  )


def GetJobResourceSpec(prompt=False):
  return concepts.ResourceSpec(
      'run.namespaces.jobs',
      namespacesId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      jobsId=JobAttributeConfig(prompt=prompt),
      resource_name='Job',
      api_version='v1',
  )


def GetExecutionResourceSpec(prompt=False):
  return concepts.ResourceSpec(
      'run.namespaces.executions',
      namespacesId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      executionsId=ExecutionAttributeConfig(prompt=prompt),
      resource_name='Execution',
      api_version='v1',
  )


def GetTaskResourceSpec(prompt=False):
  return concepts.ResourceSpec(
      'run.namespaces.tasks',
      namespacesId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      tasksId=TaskAttributeConfig(prompt=prompt),
      resource_name='Task',
      api_version='v1',
  )


def GetV1WorkerPoolResourceSpec(prompt=False):
  return concepts.ResourceSpec(
      'run.namespaces.workerpools',
      namespacesId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      workerpoolsId=WorkerPoolAttributeConfig(prompt),
      resource_name='WorkerPool',
      api_version='v1',
  )


def GetV2WorkerPoolResourceSpec(prompt=False):
  return concepts.ResourceSpec(
      'run.projects.locations.workerPools',
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      locationsId=LocationAttributeConfig(),
      workerPoolsId=WorkerPoolAttributeConfig(prompt),
      resource_name='WorkerPool',
      api_version='v2',
  )


def GetV2WorkerPoolRevisionResourceSpec(prompt=False):
  return concepts.ResourceSpec(
      'run.projects.locations.workerPools.revisions',
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      locationsId=LocationAttributeConfig(),
      workerPoolsId=WorkerPoolAttributeConfig(prompt),
      revisionsId=RevisionAttributeConfig(),
      resource_name='WorkerPoolRevision',
      api_version='v2',
  )


def GetProjectResourceSpec():
  return concepts.ResourceSpec(
      'run.projects',
      resource_name='project',
      projectsId=ProjectAttributeConfig(),
  )


def GetRegionResourceSpec():
  return concepts.ResourceSpec(
      'run.projects.locations',
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      locationsId=LocationAttributeConfig(),
      resource_name='Region',
      api_version='v2',
  )


def GetNamespaceResourceSpec():
  """Returns a resource spec for the namespace."""
  # TODO(b/148817410): Remove this when the api has been split.
  # This try/except block is needed because the v1alpha1 and v1 run apis
  # have different collection names for the namespaces.
  try:
    return concepts.ResourceSpec(
        'run.namespaces',
        namespacesId=NamespaceAttributeConfig(),
        resource_name='namespace',
    )
  except resources.InvalidCollectionException:
    return concepts.ResourceSpec(
        'run.api.v1.namespaces',
        namespacesId=NamespaceAttributeConfig(),
        resource_name='namespace',
    )


CLUSTER_PRESENTATION = presentation_specs.ResourcePresentationSpec(
    '--cluster',
    GetClusterResourceSpec(),
    'Kubernetes Engine cluster to connect to.',
    hidden=True,
    required=False,
    prefixes=True,
)

REGION_PRESENTATION = presentation_specs.ResourcePresentationSpec(
    '--region',
    GetRegionResourceSpec(),
    'Cloud region to use.',
    hidden=True,
    required=False,
    prefixes=True,
)