HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/run/printers/k8s_object_printer_util.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Contains shared methods for printing k8s object in a human-readable way."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import json
import textwrap
from typing import Mapping, Union

from googlecloudsdk.api_lib.run import container_resource
from googlecloudsdk.api_lib.run import k8s_object
from googlecloudsdk.core.console import console_attr
from googlecloudsdk.core.resource import custom_printer_base as cp


def OrderByKey(map_):
  for k in sorted(k if k is not None else '' for k in map_):
    yield k, (map_.get(k) if map_.get(k) is not None else '')


def FormatReadyMessage(record):
  """Returns the record's status condition Ready (or equivalent) message."""
  if record.ready_condition and record.ready_condition['message']:
    symbol, color = record.ReadySymbolAndColor()
    return console_attr.GetConsoleAttr().Colorize(
        textwrap.fill(
            '{} {}'.format(symbol, record.ready_condition['message']), 100
        ),
        color,
    )
  elif record.status is None:
    return console_attr.GetConsoleAttr().Colorize(
        'Error getting status information', 'red'
    )
  else:
    return ''


def LastUpdatedMessage(record):
  if record.status is None:
    return 'Unknown update information'
  modifier = record.last_modifier or '?'
  last_transition_time = '?'
  for condition in record.status.conditions:
    if condition.type == 'Ready' and condition.lastTransitionTime:
      last_transition_time = condition.lastTransitionTime
  return 'Last updated on {} by {}'.format(last_transition_time, modifier)


def LastUpdatedMessageForJob(record):
  modifier = record.last_modifier or '?'
  last_updated_time = record.last_modified_timestamp or '?'
  return 'Last updated on {} by {}'.format(last_updated_time, modifier)


def GetLabels(labels):
  """Returns a human readable description of user provided labels if any."""
  if not labels:
    return ''
  return ' '.join(
      sorted(
          [
              '{}:{}'.format(k, v)
              for k, v in labels.items()
              if not k.startswith(k8s_object.INTERNAL_GROUPS)
          ]
      )
  )


def BuildHeader(record, is_multi_region=False):
  con = console_attr.GetConsoleAttr()
  status = con.Colorize(*record.ReadySymbolAndColor())
  try:
    place = ('regions ' if is_multi_region else 'region ') + record.region
  except KeyError:
    place = 'namespace ' + record.namespace
  kind = ('Multi-Region ' if is_multi_region else '') + record.Kind()
  return con.Emphasize(
      '{} {} {} in {}'.format(status, kind, record.name, place)
  )


def GetCloudSqlInstances(record):
  """Returns the value of the cloudsql-instances.

  Args:
    record: A dictionary-like object containing the CLOUDSQL_ANNOTATION.
  """
  instances = record.get(container_resource.CLOUDSQL_ANNOTATION, '')
  return instances.replace(',', ', ')


def GetVpcNetwork(record):
  """Returns the VPC Network setting.

  Either the values of the vpc-access-connector and vpc-access-egress, or the
  values of the network and subnetwork in network-interfaces annotation and
  vpc-access-egress.

  Args:
    record: A dictionary-like object containing the VPC_ACCESS_ANNOTATION and
      EGRESS_SETTINGS_ANNOTATION keys.
  """
  connector = record.get(container_resource.VPC_ACCESS_ANNOTATION, '')
  if connector:
    return cp.Labeled([
        ('Connector', connector),
        (
            'Egress',
            record.get(container_resource.EGRESS_SETTINGS_ANNOTATION, ''),
        ),
    ])
  # Direct VPC case if annoation exists.
  original_value = record.get(k8s_object.NETWORK_INTERFACES_ANNOTATION, '')
  if not original_value:
    return ''
  try:
    network_interface = json.loads(original_value)[0]
    return cp.Labeled([
        ('Network', network_interface.get('network', '')),
        ('Subnet', network_interface.get('subnetwork', '')),
        (
            'Egress',
            record.get(container_resource.EGRESS_SETTINGS_ANNOTATION, ''),
        ),
    ])
  except Exception:  # pylint: disable=broad-except
    return ''


def GetBinAuthzPolicy(record):
  return record.annotations.get(k8s_object.BINAUTHZ_POLICY_ANNOTATION, '')


def GetBinAuthzBreakglass(record):
  return record.annotations.get(k8s_object.BINAUTHZ_BREAKGLASS_ANNOTATION)


def GetDescription(record):
  return record.annotations.get(k8s_object.DESCRIPTION_ANNOTATION)


def GetExecutionEnvironment(record):
  return record.annotations.get(k8s_object.EXECUTION_ENVIRONMENT_ANNOTATION, '')


def GetThreatDetectionEnabled(record):
  if record.annotations.get(
      k8s_object.THREAT_DETECTION_ANNOTATION, '').lower() == 'true':
    return 'Enabled'
  return ''


def GetStartupProbe(
    container: container_resource.Container,
    labels: Mapping[str, str],
    is_primary: bool,
) -> Union[str, cp.Lines]:
  probe_type = ''
  if is_primary:
    probe_type = labels.get('run.googleapis.com/startupProbeType', '')
  return _GetProbe(
      container.startupProbe,
      probe_type,
  )


def GetLivenessProbe(
    container: container_resource.Container,
) -> Union[str, cp.Lines]:
  return _GetProbe(container.livenessProbe)


def GetReadinessProbe(
    container: container_resource.Container,
) -> Union[str, cp.Lines]:
  return _GetProbe(container.readinessProbe)


def _GetProbe(probe, probe_type=''):
  """Returns the information message for the given probe."""
  if not probe:
    return ''
  probe_action = 'TCP'
  port = ''
  path = ''
  if probe.httpGet:
    probe_action = 'HTTP'
    path = probe.httpGet.path
  if probe.tcpSocket:
    probe_action = 'TCP'
    port = probe.tcpSocket.port
  if probe.grpc:
    probe_action = 'GRPC'
    port = probe.grpc.port
  return cp.Lines([
      '{probe_action} every {period}s'.format(
          probe_action=probe_action, period=probe.periodSeconds
      ),
      cp.Labeled([
          ('Path', path),
          ('Port', port),
          (
              'Initial delay',
              '{initial_delay}s'.format(
                  initial_delay=probe.initialDelaySeconds or '0'
              ),
          ),
          (
              'Timeout',
              '{timeout}s'.format(timeout=probe.timeoutSeconds),
          ),
          (
              'Success threshold',
              '{successes}'.format(successes=probe.successThreshold or ''),
          ),
          (
              'Failure threshold',
              '{failures}'.format(failures=probe.failureThreshold),
          ),
          ('Type', probe_type),
      ]),
  ])