File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/code/kubernetes.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Library for generating the files for local development environment."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import subprocess
import sys
from googlecloudsdk.command_lib.code import run_subprocess
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import properties
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.util import platforms
from googlecloudsdk.core.util import times
import six
DEFAULT_CLUSTER_NAME = 'gcloud-local-dev'
class _KubeCluster(object):
  """A kubernetes cluster.
  Attributes:
    context_name: Kubernetes context name.
    env_vars: Docker env vars.
    shared_docker: Whether the kubernetes cluster shares a docker instance with
      the developer's machine.
  """
  def __init__(self, context_name, shared_docker):
    """Initializes KubeCluster with cluster name.
    Args:
      context_name: Kubernetes context.
      shared_docker: Whether the kubernetes cluster shares a docker instance
        with the developer's machine.
    """
    self.context_name = context_name
    self.shared_docker = shared_docker
  @property
  def env_vars(self):
    return {}
def GetMinikubeVersion():
  """Returns the current version of minikube."""
  return six.ensure_text(subprocess.check_output([_FindMinikube(), 'version']))
class MinikubeCluster(_KubeCluster):
  """A cluster on minikube.
  Attributes:
    context_name: Kubernetes context name.
    env_vars: Docker environment variables.
    shared_docker: Whether the kubernetes cluster shares a docker instance with
      the developer's machine.
  """
  @property
  def env_vars(self):
    return _GetMinikubeDockerEnvs(self.context_name)
class Minikube(object):
  """Starts and stops a minikube cluster."""
  def __init__(self,
               cluster_name,
               stop_cluster=True,
               vm_driver=None,
               debug=False):
    self._cluster_name = cluster_name
    self._stop_cluster = stop_cluster
    self._vm_driver = vm_driver
    self._debug = debug
  def __enter__(self):
    _StartMinikubeCluster(self._cluster_name, self._vm_driver, self._debug)
    return MinikubeCluster(self._cluster_name, self._vm_driver == 'docker')
  def __exit__(self, exc_type, exc_value, tb):
    if self._stop_cluster:
      _StopMinikube(self._cluster_name, self._debug)
def _FindMinikube():
  return (properties.VALUES.code.minikube_path_override.Get() or
          run_subprocess.GetGcloudPreferredExecutable('minikube'))
class MinikubeStartError(exceptions.Error):
  """Error if minikube fails to start."""
_MINIKUBE_STEP = 'io.k8s.sigs.minikube.step'
_MINIKUBE_DOWNLOAD_PROGRESS = 'io.k8s.sigs.minikube.download.progress'
_MINIKUBE_ERROR = 'io.k8s.sigs.minikube.error'
_MINIKUBE_NOT_ENOUGH_CPU_FRAGMENT = 'The minimum allowed is 2 CPUs.'
# pylint: disable=line-too-long
# See https://github.com/kubernetes/minikube/blob/master/pkg/minikube/reason/exitcodes.go
# pylint: enable=line-too-long
_MINIKUBE_ERROR_MESSAGES = {
    '29': 'Not enough CPUs. Cloud Run Emulator requires 2 CPUs.',
    '69': 'Cannot reach docker daemon.',
}
_MINIKUBE_PASSTHROUGH_ADVICE_IDS = frozenset(['HOST_HOME_PERMISSION'])
if platforms.OperatingSystem.Current() != platforms.OperatingSystem.LINUX:
  _MINIKUBE_ERROR_MESSAGES['29'] += ' Increase Docker VM CPUs to 2.'
def _StartMinikubeCluster(cluster_name, vm_driver, debug=False):
  """Starts a minikube cluster."""
  # pylint: disable=broad-except
  try:
    if not _IsMinikubeClusterUp(cluster_name):
      cmd = [
          _FindMinikube(),
          'start',
          '-p',
          cluster_name,
          '--keep-context',
          '--interactive=false',
          '--delete-on-failure',
          '--install-addons=false',
          '--output=json',
      ]
      if vm_driver:
        cmd.append('--vm-driver=' + vm_driver)
        if vm_driver == 'docker':
          cmd.append('--container-runtime=docker')
      if debug:
        cmd.extend(['--alsologtostderr', '-v8'])
      start_msg = "Starting development environment '%s' ..." % cluster_name
      event_timeout = times.ParseDuration(
          properties.VALUES.code.minikube_event_timeout.Get(
              required=True)).total_seconds
      with console_io.ProgressBar(start_msg) as progress_bar:
        for json_obj in run_subprocess.StreamOutputJson(
            cmd, event_timeout_sec=event_timeout, show_stderr=debug):
          if debug:
            print('minikube', json_obj)
          _HandleMinikubeStatusEvent(progress_bar, json_obj)
  except Exception as e:
    six.reraise(MinikubeStartError, e, sys.exc_info()[2])
def _HandleMinikubeStatusEvent(progress_bar, json_obj):
  """Handle a minikube json event."""
  if json_obj['type'] == _MINIKUBE_STEP:
    data = json_obj['data']
    # https://github.com/kubernetes/minikube/issues/9754
    # currentstep and totalsteps could be:
    #   missing -> invalid
    #   ''      -> invalid
    #   '0'     -> ok
    #   0       -> ok
    # pylint:disable=g-explicit-bool-comparison
    if data.get('currentstep', '') != '' and data.get('totalsteps', '') != '':
      current_step = int(data['currentstep'])
      total_steps = int(data['totalsteps'])
      completion_fraction = current_step / float(total_steps)
      progress_bar.SetProgress(completion_fraction)
  elif json_obj['type'] == _MINIKUBE_DOWNLOAD_PROGRESS:
    data = json_obj['data']
    # https://github.com/kubernetes/minikube/issues/9754
    # currentstep and totalsteps could be:
    #   missing -> invalid
    #   ''      -> invalid
    #   '0'     -> ok
    #   0       -> ok
    # pylint:disable=g-explicit-bool-comparison
    if (data.get('currentstep', '') != '' and
        data.get('totalsteps', '') != '' and 'progress' in data):
      current_step = int(data['currentstep'])
      total_steps = int(data['totalsteps'])
      download_progress = float(data['progress'])
      completion_fraction = (current_step + download_progress) / total_steps
      progress_bar.SetProgress(completion_fraction)
  elif (json_obj['type'] == _MINIKUBE_ERROR and 'exitcode' in json_obj['data']):
    data = json_obj['data']
    if ('id' in data and 'advice' in data and
        data['id'] in _MINIKUBE_PASSTHROUGH_ADVICE_IDS):
      raise MinikubeStartError(data['advice'])
    else:
      exit_code = data['exitcode']
      msg = _MINIKUBE_ERROR_MESSAGES.get(exit_code,
                                         'Unable to start Cloud Run Emulator.')
      raise MinikubeStartError(msg)
def _GetMinikubeDockerEnvs(cluster_name):
  """Get the docker environment settings for a given cluster."""
  cmd = [_FindMinikube(), 'docker-env', '-p', cluster_name, '--shell=none']
  lines = run_subprocess.GetOutputLines(cmd, timeout_sec=20)
  return dict(
      line.split('=', 1) for line in lines if line and not line.startswith('#'))
def _IsMinikubeClusterUp(cluster_name):
  """Checks if a minikube cluster is running."""
  cmd = [_FindMinikube(), 'status', '-p', cluster_name, '-o', 'json']
  try:
    status = run_subprocess.GetOutputJson(
        cmd, timeout_sec=20, show_stderr=False)
    return 'Host' in status and status['Host'].strip() == 'Running'
  except (ValueError, subprocess.CalledProcessError):
    return False
def _StopMinikube(cluster_name, debug=False):
  """Stop a minikube cluster."""
  cmd = [_FindMinikube(), 'stop', '-p', cluster_name]
  print("Stopping development environment '%s' ..." % cluster_name)
  run_subprocess.Run(cmd, timeout_sec=150, show_output=debug)
  print('Development environment stopped.')
def DeleteMinikube(cluster_name):
  """Delete a minikube cluster."""
  cmd = [_FindMinikube(), 'delete', '-p', cluster_name]
  print("Deleting development environment '%s' ..." % cluster_name)
  run_subprocess.Run(cmd, timeout_sec=150, show_output=False)
  print('Development environment stopped.')
class ExternalCluster(_KubeCluster):
  """A external kubernetes cluster.
  Attributes:
    context_name: Kubernetes context name.
    env_vars: Docker environment variables.
    shared_docker: Whether the kubernetes cluster shares a docker instance with
      the developer's machine.
  """
  def __init__(self, cluster_name):
    """Initializes ExternalCluster with profile name.
    Args:
      cluster_name: Name of the cluster.
    """
    super(ExternalCluster, self).__init__(cluster_name, False)
class ExternalClusterContext(object):
  """Do nothing context manager for external clusters."""
  def __init__(self, kube_context):
    self._kube_context = kube_context
  def __enter__(self):
    return ExternalCluster(self._kube_context)
  def __exit__(self, exc_type, exc_value, tb):
    pass
def _FindKubectl():
  return run_subprocess.GetGcloudPreferredExecutable('kubectl')
def _NamespaceExists(namespace, context_name=None):
  cmd = [_FindKubectl()]
  if context_name:
    cmd += ['--context', context_name]
  cmd += ['get', 'namespaces', '-o', 'name']
  namespaces = run_subprocess.GetOutputLines(
      cmd, timeout_sec=20, show_stderr=False)
  return 'namespace/' + namespace in namespaces
def _CreateNamespace(namespace, context_name=None):
  cmd = [_FindKubectl()]
  if context_name:
    cmd += ['--context', context_name]
  cmd += ['create', 'namespace', namespace]
  run_subprocess.Run(cmd, timeout_sec=20, show_output=False)
def _DeleteNamespace(namespace, context_name=None):
  cmd = [_FindKubectl()]
  if context_name:
    cmd += ['--context', context_name]
  cmd += ['delete', 'namespace', namespace]
  run_subprocess.Run(cmd, timeout_sec=20, show_output=False)
class KubeNamespace(object):
  """Context to create and tear down kubernetes namespace."""
  def __init__(self, namespace, context_name=None):
    """Initialize KubeNamespace.
    Args:
      namespace: (str) Namespace name.
      context_name: (str) Kubernetes context name.
    """
    self._namespace = namespace
    self._context_name = context_name
    self._delete_namespace = False
  def __enter__(self):
    if not _NamespaceExists(self._namespace, self._context_name):
      _CreateNamespace(self._namespace, self._context_name)
      self._delete_namespace = True
  def __exit__(self, exc_type, exc_value, tb):
    if self._delete_namespace:
      _DeleteNamespace(self._namespace, self._context_name)