HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/code/kubernetes.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Library for generating the files for local development environment."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import subprocess
import sys

from googlecloudsdk.command_lib.code import run_subprocess
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import properties
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.util import platforms
from googlecloudsdk.core.util import times
import six

DEFAULT_CLUSTER_NAME = 'gcloud-local-dev'


class _KubeCluster(object):
  """A kubernetes cluster.

  Attributes:
    context_name: Kubernetes context name.
    env_vars: Docker env vars.
    shared_docker: Whether the kubernetes cluster shares a docker instance with
      the developer's machine.
  """

  def __init__(self, context_name, shared_docker):
    """Initializes KubeCluster with cluster name.

    Args:
      context_name: Kubernetes context.
      shared_docker: Whether the kubernetes cluster shares a docker instance
        with the developer's machine.
    """
    self.context_name = context_name
    self.shared_docker = shared_docker

  @property
  def env_vars(self):
    return {}


def GetMinikubeVersion():
  """Returns the current version of minikube."""
  return six.ensure_text(subprocess.check_output([_FindMinikube(), 'version']))


class MinikubeCluster(_KubeCluster):
  """A cluster on minikube.

  Attributes:
    context_name: Kubernetes context name.
    env_vars: Docker environment variables.
    shared_docker: Whether the kubernetes cluster shares a docker instance with
      the developer's machine.
  """

  @property
  def env_vars(self):
    return _GetMinikubeDockerEnvs(self.context_name)


class Minikube(object):
  """Starts and stops a minikube cluster."""

  def __init__(self,
               cluster_name,
               stop_cluster=True,
               vm_driver=None,
               debug=False):
    self._cluster_name = cluster_name
    self._stop_cluster = stop_cluster
    self._vm_driver = vm_driver
    self._debug = debug

  def __enter__(self):
    _StartMinikubeCluster(self._cluster_name, self._vm_driver, self._debug)
    return MinikubeCluster(self._cluster_name, self._vm_driver == 'docker')

  def __exit__(self, exc_type, exc_value, tb):
    if self._stop_cluster:
      _StopMinikube(self._cluster_name, self._debug)


def _FindMinikube():
  return (properties.VALUES.code.minikube_path_override.Get() or
          run_subprocess.GetGcloudPreferredExecutable('minikube'))


class MinikubeStartError(exceptions.Error):
  """Error if minikube fails to start."""


_MINIKUBE_STEP = 'io.k8s.sigs.minikube.step'

_MINIKUBE_DOWNLOAD_PROGRESS = 'io.k8s.sigs.minikube.download.progress'

_MINIKUBE_ERROR = 'io.k8s.sigs.minikube.error'

_MINIKUBE_NOT_ENOUGH_CPU_FRAGMENT = 'The minimum allowed is 2 CPUs.'

# pylint: disable=line-too-long
# See https://github.com/kubernetes/minikube/blob/master/pkg/minikube/reason/exitcodes.go
# pylint: enable=line-too-long
_MINIKUBE_ERROR_MESSAGES = {
    '29': 'Not enough CPUs. Cloud Run Emulator requires 2 CPUs.',
    '69': 'Cannot reach docker daemon.',
}

_MINIKUBE_PASSTHROUGH_ADVICE_IDS = frozenset(['HOST_HOME_PERMISSION'])

if platforms.OperatingSystem.Current() != platforms.OperatingSystem.LINUX:
  _MINIKUBE_ERROR_MESSAGES['29'] += ' Increase Docker VM CPUs to 2.'


def _StartMinikubeCluster(cluster_name, vm_driver, debug=False):
  """Starts a minikube cluster."""
  # pylint: disable=broad-except
  try:
    if not _IsMinikubeClusterUp(cluster_name):
      cmd = [
          _FindMinikube(),
          'start',
          '-p',
          cluster_name,
          '--keep-context',
          '--interactive=false',
          '--delete-on-failure',
          '--install-addons=false',
          '--output=json',
      ]
      if vm_driver:
        cmd.append('--vm-driver=' + vm_driver)
        if vm_driver == 'docker':
          cmd.append('--container-runtime=docker')
      if debug:
        cmd.extend(['--alsologtostderr', '-v8'])

      start_msg = "Starting development environment '%s' ..." % cluster_name

      event_timeout = times.ParseDuration(
          properties.VALUES.code.minikube_event_timeout.Get(
              required=True)).total_seconds

      with console_io.ProgressBar(start_msg) as progress_bar:
        for json_obj in run_subprocess.StreamOutputJson(
            cmd, event_timeout_sec=event_timeout, show_stderr=debug):
          if debug:
            print('minikube', json_obj)

          _HandleMinikubeStatusEvent(progress_bar, json_obj)
  except Exception as e:
    six.reraise(MinikubeStartError, e, sys.exc_info()[2])


def _HandleMinikubeStatusEvent(progress_bar, json_obj):
  """Handle a minikube json event."""
  if json_obj['type'] == _MINIKUBE_STEP:
    data = json_obj['data']

    # https://github.com/kubernetes/minikube/issues/9754
    # currentstep and totalsteps could be:
    #   missing -> invalid
    #   ''      -> invalid
    #   '0'     -> ok
    #   0       -> ok
    # pylint:disable=g-explicit-bool-comparison
    if data.get('currentstep', '') != '' and data.get('totalsteps', '') != '':
      current_step = int(data['currentstep'])
      total_steps = int(data['totalsteps'])
      completion_fraction = current_step / float(total_steps)
      progress_bar.SetProgress(completion_fraction)
  elif json_obj['type'] == _MINIKUBE_DOWNLOAD_PROGRESS:
    data = json_obj['data']

    # https://github.com/kubernetes/minikube/issues/9754
    # currentstep and totalsteps could be:
    #   missing -> invalid
    #   ''      -> invalid
    #   '0'     -> ok
    #   0       -> ok
    # pylint:disable=g-explicit-bool-comparison
    if (data.get('currentstep', '') != '' and
        data.get('totalsteps', '') != '' and 'progress' in data):
      current_step = int(data['currentstep'])
      total_steps = int(data['totalsteps'])
      download_progress = float(data['progress'])

      completion_fraction = (current_step + download_progress) / total_steps
      progress_bar.SetProgress(completion_fraction)
  elif (json_obj['type'] == _MINIKUBE_ERROR and 'exitcode' in json_obj['data']):
    data = json_obj['data']
    if ('id' in data and 'advice' in data and
        data['id'] in _MINIKUBE_PASSTHROUGH_ADVICE_IDS):
      raise MinikubeStartError(data['advice'])
    else:
      exit_code = data['exitcode']
      msg = _MINIKUBE_ERROR_MESSAGES.get(exit_code,
                                         'Unable to start Cloud Run Emulator.')
      raise MinikubeStartError(msg)


def _GetMinikubeDockerEnvs(cluster_name):
  """Get the docker environment settings for a given cluster."""
  cmd = [_FindMinikube(), 'docker-env', '-p', cluster_name, '--shell=none']
  lines = run_subprocess.GetOutputLines(cmd, timeout_sec=20)
  return dict(
      line.split('=', 1) for line in lines if line and not line.startswith('#'))


def _IsMinikubeClusterUp(cluster_name):
  """Checks if a minikube cluster is running."""
  cmd = [_FindMinikube(), 'status', '-p', cluster_name, '-o', 'json']
  try:
    status = run_subprocess.GetOutputJson(
        cmd, timeout_sec=20, show_stderr=False)
    return 'Host' in status and status['Host'].strip() == 'Running'
  except (ValueError, subprocess.CalledProcessError):
    return False


def _StopMinikube(cluster_name, debug=False):
  """Stop a minikube cluster."""
  cmd = [_FindMinikube(), 'stop', '-p', cluster_name]
  print("Stopping development environment '%s' ..." % cluster_name)
  run_subprocess.Run(cmd, timeout_sec=150, show_output=debug)
  print('Development environment stopped.')


def DeleteMinikube(cluster_name):
  """Delete a minikube cluster."""
  cmd = [_FindMinikube(), 'delete', '-p', cluster_name]
  print("Deleting development environment '%s' ..." % cluster_name)
  run_subprocess.Run(cmd, timeout_sec=150, show_output=False)
  print('Development environment stopped.')


class ExternalCluster(_KubeCluster):
  """A external kubernetes cluster.

  Attributes:
    context_name: Kubernetes context name.
    env_vars: Docker environment variables.
    shared_docker: Whether the kubernetes cluster shares a docker instance with
      the developer's machine.
  """

  def __init__(self, cluster_name):
    """Initializes ExternalCluster with profile name.

    Args:
      cluster_name: Name of the cluster.
    """
    super(ExternalCluster, self).__init__(cluster_name, False)


class ExternalClusterContext(object):
  """Do nothing context manager for external clusters."""

  def __init__(self, kube_context):
    self._kube_context = kube_context

  def __enter__(self):
    return ExternalCluster(self._kube_context)

  def __exit__(self, exc_type, exc_value, tb):
    pass


def _FindKubectl():
  return run_subprocess.GetGcloudPreferredExecutable('kubectl')


def _NamespaceExists(namespace, context_name=None):
  cmd = [_FindKubectl()]
  if context_name:
    cmd += ['--context', context_name]
  cmd += ['get', 'namespaces', '-o', 'name']
  namespaces = run_subprocess.GetOutputLines(
      cmd, timeout_sec=20, show_stderr=False)
  return 'namespace/' + namespace in namespaces


def _CreateNamespace(namespace, context_name=None):
  cmd = [_FindKubectl()]
  if context_name:
    cmd += ['--context', context_name]
  cmd += ['create', 'namespace', namespace]
  run_subprocess.Run(cmd, timeout_sec=20, show_output=False)


def _DeleteNamespace(namespace, context_name=None):
  cmd = [_FindKubectl()]
  if context_name:
    cmd += ['--context', context_name]
  cmd += ['delete', 'namespace', namespace]
  run_subprocess.Run(cmd, timeout_sec=20, show_output=False)


class KubeNamespace(object):
  """Context to create and tear down kubernetes namespace."""

  def __init__(self, namespace, context_name=None):
    """Initialize KubeNamespace.

    Args:
      namespace: (str) Namespace name.
      context_name: (str) Kubernetes context name.
    """
    self._namespace = namespace
    self._context_name = context_name
    self._delete_namespace = False

  def __enter__(self):
    if not _NamespaceExists(self._namespace, self._context_name):
      _CreateNamespace(self._namespace, self._context_name)
      self._delete_namespace = True

  def __exit__(self, exc_type, exc_value, tb):
    if self._delete_namespace:
      _DeleteNamespace(self._namespace, self._context_name)