HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/run/global_methods.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Methods and constants for global access."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import exceptions as api_exceptions
from googlecloudsdk.api_lib.container import api_adapter as container_api_adapter
from googlecloudsdk.api_lib.container.fleet import client as hub_client
from googlecloudsdk.api_lib.container.fleet import util as hub_util
from googlecloudsdk.api_lib.run import job
from googlecloudsdk.api_lib.run import service
from googlecloudsdk.api_lib.run import worker_pool
from googlecloudsdk.api_lib.runtime_config import util
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources

CONTAINER_API_VERSION = 'v1beta1'

SERVERLESS_API_NAME = 'run'
SERVERLESS_API_VERSION = 'v1'

_ALL_REGIONS = '-'

CLOUDRUN_FEATURE = 'appdevexperience'


def GetServerlessClientInstance(api_version=SERVERLESS_API_VERSION):
  return apis.GetClientInstance(SERVERLESS_API_NAME, api_version)


def ListRegions(client):
  """Get the list of all available regions from control plane.

  Args:
    client: (base_api.BaseApiClient), instance of a client to use for the list
      request.

  Returns:
    A list of str, which are regions.
  """
  return sorted([l.locationId for l in ListLocations(client)])


def ListLocations(client):
  """Get the list of all available regions from control plane.

  Args:
    client: (base_api.BaseApiClient), instance of a client to use for the list
      request.

  Returns:
    A list of location resources.
  """
  project_resource_relname = util.ProjectPath(
      properties.VALUES.core.project.Get(required=True)
  )
  return client.projects_locations.List(
      client.MESSAGES_MODULE.RunProjectsLocationsListRequest(
          name=project_resource_relname, pageSize=100
      )
  ).locations


def ListServices(client, region=_ALL_REGIONS, field_selector=None):
  """Get the global services for a OnePlatform project.

  Args:
    client: (base_api.BaseApiClient), instance of a client to use for the list
      request.
    region: (str) optional name of location to search for services in. If not
      passed, this defaults to the global value for all locations.
    field_selector: (str) optional parameter to pass in request.field_selector.

  Returns:
    List of googlecloudsdk.api_lib.run import service.Service objects.
  """
  project = properties.VALUES.core.project.Get(required=True)
  locations = resources.REGISTRY.Parse(
      region,
      params={'projectsId': project},
      collection='run.projects.locations',
  )
  request = client.MESSAGES_MODULE.RunProjectsLocationsServicesListRequest(
      parent=locations.RelativeName(), fieldSelector=field_selector
  )
  response = client.projects_locations_services.List(request)

  # Log the regions that did not respond.
  if response.unreachable:
    log.warning(
        'The following Cloud Run regions did not respond: {}. '
        'List results may be incomplete.'.format(
            ', '.join(sorted(response.unreachable))
        )
    )

  return [
      service.Service(item, client.MESSAGES_MODULE) for item in response.items
  ]


# TODO(b/322180968): Once Worker API is ready, wire up to the Worker API.
def ListWorkers(client):
  return ListServices(client)


def ListJobs(client, namespace):
  """Get the global services for a OnePlatform project.

  Args:
    client: (base_api.BaseApiClient), instance of a client to use for the list
      request.
    namespace: namespace/project to list jobs in

  Returns:
    List of googlecloudsdk.api_lib.run import job.Job objects.
  """
  request = client.MESSAGES_MODULE.RunNamespacesJobsListRequest(
      parent=namespace.RelativeName()
  )
  response = client.namespaces_jobs.List(request)

  # Log the regions that did not respond.
  if response.unreachable:
    log.warning(
        'The following Cloud Run regions did not respond: {}. '
        'List results may be incomplete.'.format(
            ', '.join(sorted(response.unreachable))
        )
    )

  return [job.Job(item, client.MESSAGES_MODULE) for item in response.items]


def ListWorkerPools(client, project):
  """Get the global list of worker pools for a OnePlatform project.

  Args:
    client: (base_api.BaseApiClient), instance of a client to use for the list
      request.
    project: project to list worker pools in

  Returns:
    List of googlecloudsdk.api_lib.run import worker_pool.WorkerPool objects.
  """
  request = client.MESSAGES_MODULE.RunNamespacesWorkerpoolsListRequest(
      parent=project.RelativeName()
  )
  response = client.namespaces_workerpools.List(request)

  # Log the regions that did not respond.
  if response.unreachable:
    log.warning(
        'The following Cloud Run regions did not respond: {}. '
        'List results may be incomplete.'.format(
            ', '.join(sorted(response.unreachable))
        )
    )

  return [
      worker_pool.WorkerPool(item, client.MESSAGES_MODULE)
      for item in response.items
  ]


def ListClusters(location=None, project=None):
  """Get all clusters with Cloud Run enabled.

  Args:
    location: str optional name of location to search for clusters in. Leaving
      this field blank will search in all locations.
    project: str optional name of project to search for clusters in. Leaving
      this field blank will use the project defined by the corresponding
      property.

  Returns:
    List of
    googlecloudsdk.generated_clients.apis.container.CONTAINER_API_VERSION
    import container_CONTAINER_API_VERSION_messages.Cluster objects
  """
  container_api = container_api_adapter.NewAPIAdapter(CONTAINER_API_VERSION)
  if not project:
    project = properties.VALUES.core.project.Get(required=True)

  response = container_api.ListClusters(project, location)
  if response.missingZones:
    log.warning(
        'The following cluster locations did not respond: {}. '
        'List results may be incomplete.'.format(
            ', '.join(response.missingZones)
        )
    )

  def _SortKey(cluster):
    return (cluster.zone, cluster.name)

  clusters = sorted(response.clusters, key=_SortKey)

  crfa_cluster_names = ListCloudRunForAnthosClusters(project)

  return [
      c
      for c in clusters
      if (
          c.name in crfa_cluster_names
          or (
              c.addonsConfig.cloudRunConfig
              and not c.addonsConfig.cloudRunConfig.disabled
          )
      )
  ]


def ListVerifiedDomains(client):
  """Get all verified domains.

  Args:
    client: (base_api.BaseApiClient), instance of a client to use for the list
      request.

  Returns:
    List of client.MESSAGES_MODULE.AuthorizedDomain objects
  """
  project_resource_relname = util.ProjectPath(
      properties.VALUES.core.project.Get(required=True)
  )
  request = client.MESSAGES_MODULE.RunProjectsAuthorizeddomainsListRequest(
      parent=project_resource_relname
  )
  response = client.projects_authorizeddomains.List(request)
  return response.domains


def GetClusterRef(cluster, project=None):
  """Returns a ref for the specified cluster.

  Args:
    cluster: container_CONTAINER_API_VERSION_messages.Cluster object
    project: str optional project which overrides the default

  Returns:
    A Resource object
  """
  if not project:
    project = properties.VALUES.core.project.Get(required=True)
  return resources.REGISTRY.Parse(
      cluster.name,
      params={'projectId': project, 'zone': cluster.zone},
      collection='container.projects.zones.clusters',
  )


def MultiTenantClustersForProject(project_id, cluster_location):
  """Returns a list of clusters accounting for multi-tenant projects.

  This function can also be used for non-multitenant projects and will
  operate on the single passed-in project_id.

  Args:
    project_id: The id of the project, which may or may not be multi-tenant
    cluster_location: The zone or region of the cluster

  Returns:
    A list of cluster refs
  """
  project_ids = []
  project_ids.insert(0, project_id)
  return _ClustersForProjectIds(project_ids, cluster_location)


def _ClustersForProjectIds(project_ids, cluster_location):
  response = []
  for project_id in project_ids:
    clusters = ListClusters(cluster_location, project_id)
    for cluster in clusters:
      response.append(GetClusterRef(cluster, project_id))
  return response


def ListCloudRunForAnthosClusters(project):
  """Get all clusters with Cloud Run for Anthos enabled.

  Args:
   project: str optional of project to search for clusters in. Leaving this
     field blank will use the project defined by the corresponding property.

  Returns:
    List of Cluster string names
  """

  crfa_spec = 'projects/%s/locations/global/features/%s' % (
      project,
      CLOUDRUN_FEATURE,
  )
  try:
    f = hub_client.HubClient().GetFeature(crfa_spec)
  except api_exceptions.HttpError:
    return []

  cluster_state_obj = _ListAnthosClusterStates(f)
  return [name for name, state in cluster_state_obj.items() if state == 'OK']


def _ListAnthosClusterStates(f):
  try:
    cluster_state_obj = {
        hub_util.MembershipShortname(m): s.state.code.name
        for m, s in hub_client.HubClient.ToPyDict(f.membershipStates).items()
    }
  except AttributeError:
    return {}
  return cluster_state_obj