HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/code/local.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Library for manipulating serverless local development setup."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import base64
import collections
import json
import os
import os.path
import re
import uuid

from apitools.base.py import encoding_helper
from apitools.base.py import exceptions as apitools_exceptions
from googlecloudsdk.api_lib.app import yaml_parsing as app_engine_yaml_parsing
from googlecloudsdk.api_lib.run import container_resource
from googlecloudsdk.api_lib.run import service as k8s_service
from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.api_lib.util import messages as messages_util
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.auth import auth_util
from googlecloudsdk.command_lib.code import builders
from googlecloudsdk.command_lib.code import common
from googlecloudsdk.command_lib.code import dataobject
from googlecloudsdk.command_lib.code import secrets
from googlecloudsdk.command_lib.code import yaml_helper
from googlecloudsdk.command_lib.iam import iam_util
from googlecloudsdk.command_lib.run import secrets_mapping
from googlecloudsdk.core import config
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import yaml
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.util import encoding
from googlecloudsdk.core.util import files
import six

IAM_MESSAGE_MODULE = apis.GetMessagesModule('iam', 'v1')
CRM_MESSAGE_MODULE = apis.GetMessagesModule('cloudresourcemanager', 'v1')
RUN_MESSAGES_MODULE = apis.GetMessagesModule('run', 'v1')

_DEFAULT_BUILDPACK_BUILDER = 'gcr.io/buildpacks/builder'

_C_IDENTIFIER = r'^[a-zA-Z_][a-zA-Z_0-9]*$'

# A sentinal used for unspecified flag values. We are using an object as a
# sentinal value instead of None so that for boolean flags None (unset) isn't
# confused the set value False.
_FLAG_UNSPECIFIED = object()


class _ParseError(TypeError):
  """File does not parse with the expected schema."""


class ServiceAccountSetting(dataobject.DataObject):
  """Setting object representing a service account."""
  NAMES = ('name',)


class ApplicationDefaultCredentialSetting(dataobject.DataObject):
  """Setting object representing the application default credential."""

  _instance = None

  def __new__(cls, **kwargs):
    if not cls._instance:
      cls._instance = super(ApplicationDefaultCredentialSetting,
                            cls).__new__(cls, **kwargs)

    return cls._instance


def _GaeBuilderPackagePath(runtime):
  """GCR package path for a builder that works on the given appengine runtime.

  Args:
    runtime: Name of a runtime from app.yaml, e.g. 'python38'.

  Returns:
    gcr.io image path.
  """
  return 'gcr.io/gae-runtimes/buildpacks/%s/builder:latest' % runtime


def _IsGcpBaseBuilder(bldr):
  """Return true if the builder is the GCP base builder.

  Args:
    bldr: Name of the builder.

  Returns:
    True if the builder is the GCP base builder.
  """
  return bldr == _DEFAULT_BUILDPACK_BUILDER


class _SecretPath(dataobject.DataObject):
  """Configuration for a single secret version.

    Attributes:
      key: The secret version to mount.
      path: The file path to mount it on.
  """

  NAMES = ('key', 'path')


class _SecretVolume(dataobject.DataObject):
  """Configuration for a single volume that mounts secret versions.

    Attributes:
      name: (str) The name of the volume to be referenced in the k8s resource.
      mount_path: (str) The filesystem location where the volume is mounted.
      secret_name: (str) The secret manager reference.
      mapped_secret: (str) If set, the path of the secret in another project to
        use. For example 'projects/123/secrets/foo'.
      items: (List[SecretPath]) The list of keys and paths for the secret.
  """
  NAMES = ('name', 'mapped_secret', 'mount_path', 'secret_name', 'items')

  @classmethod
  def FromParsedYaml(cls,
                     mount,
                     volume_secret,
                     secret_aliases,
                     renamed_secrets=None):
    """Make a SecretVolume based on the volumeMount and secret from the yaml."""
    items = []
    renamed_secrets = renamed_secrets or {}
    for item in volume_secret.items:
      items.append(_SecretPath(key=item.key, path=item.path))
    name = volume_secret.secretName
    mapped_secret = secret_aliases.get(volume_secret.secretName, None)
    if (not mapped_secret and not secrets.IsValidK8sName(name)):
      mapped_secret = name
      name = renamed_secrets.get(name, None)
      if not name:
        name, mapped_secret = _BuildReachableSecret(volume_secret.secretName)
        renamed_secrets[mapped_secret] = name
    return cls(
        name=mount.name,
        mount_path=mount.mountPath,
        secret_name=name,
        items=items,
        mapped_secret=mapped_secret)

  def IsSameSecret(self, other):
    if self.mapped_secret or other.mapped_secret:
      return self.mapped_secret == other.mapped_secret
    return self.secret_name == other.secret_name


class _SecretEnvVar(dataobject.DataObject):
  """Configuration for a single env var that's pulled from a secret.

  Attributes:
    name: (str) the name of the secret in the resource
    key: (str) the secret version to use
    mapped_secret: (str) if set, the path of the secret from the other project
      that this secret will pull from. For example 'projects/123/secrets/foo'.
      If not set, will pull from the secret called 'name' in the current
      project.
  """
  NAMES = ('name', 'key', 'mapped_secret')


class Settings(dataobject.DataObject):
  """Settings for local development environments.

    Attributes:
      service_name: Name of the kuberntes service.
      image: Docker image tag.
      credential: Credential setting for either service account or application
        default credential.
      context: Path to directory to use as the current working directory for the
        docker build.
      builder: The builder specification or None.
      local_port: Local port to which to forward the service connection.
      env_vars: Container environment variables.
      env_vars_secrets: Container environment variables where the values come
        from Secret Manager.
      volumes_secrets: Volumes where the values come from secret manager.
      renamed_secrets: internal map of secrets that don't correspond to k8s
        naming conventions.
      cloudsql_instances: Cloud SQL instances.
      memory: Memory limit.
      cpu: CPU limit.
      namespace: Kubernetes namespace to run in.
      readiness_probe: If true, create readiness probe.
      allow_secret_manager: If true, allow fetching secrets from secret manager
  """

  NAMES = ('service_name', 'image', 'credential', 'context', 'builder',
           'local_port', 'env_vars', 'env_vars_secrets', 'volumes_secrets',
           'renamed_secrets', 'cloudsql_instances', 'memory', 'cpu',
           'namespace', 'readiness_probe', 'allow_secret_manager')

  @classmethod
  def Defaults(cls):
    """The settings you get with no args or other overrides."""

    dir_name = os.path.basename(files.GetCWD())
    # Service names may not include space, _ and upper case characters.
    service_name = dir_name.replace('_', '-').replace(' ', '-').lower()
    dockerfile_arg_default = 'Dockerfile'
    builder = builders.DockerfileBuilder(dockerfile=dockerfile_arg_default)

    return cls(
        builder=builder,
        cloudsql_instances=[],
        context=os.path.abspath(files.GetCWD()),
        image=None,  # See Build() below.
        service_name=service_name,
        env_vars={},
        env_vars_secrets={},
        volumes_secrets=[],
        renamed_secrets={},
        allow_secret_manager=_FLAG_UNSPECIFIED,
    )

  def WithServiceYaml(self, yaml_path, is_alpha=False):
    """Overrides settings with service.yaml and returns a new Settings object.

    Args:
      yaml_path: Filename to read.
      is_alpha: boolean set to true if alpha features should be used.

    Returns:
      New Settings object.

    Raises:
      _ParseError: Input does not look like a service.yaml
    """
    knative_service = self._ParseServiceYaml(yaml_path)

    replacements = {
        'service_name': knative_service.metadata.name,
    }

    try:
      [container] = knative_service.spec.template.spec.containers
    except ValueError:
      raise exceptions.Error('knative Service must have exactly one container.')

    # If there are any secrets referenced from other projects, there will be
    # an alias specified
    secret_aliases = {}
    aliases = knative_service.annotations.get(
        container_resource.SECRETS_ANNOTATION, '')
    secret_aliases = {
        k: v.FormatAnnotationItem()
        for k, v in secrets_mapping.ParseAnnotation(aliases, True).items()
    }

    # The secrets aliases are supposed to be specified in the revision Template
    # but we originally specified the wrong location.
    if knative_service.template.IsFullObject():
      template_aliases = knative_service.template_annotations.get(
          container_resource.SECRETS_ANNOTATION, '')
      secret_aliases.update({
          k:
          v.FormatAnnotationItem() for k, v in secrets_mapping.ParseAnnotation(
              template_aliases, True).items()
      })
    new_env_vars = {}
    new_env_vars_secrets = {}
    for var in container.env:
      if var.valueFrom:
        if var.valueFrom.configMapKeyRef:
          raise exceptions.Error('env_vars from config_maps are not supported')
        elif var.valueFrom.secretKeyRef:
          secret_name = var.valueFrom.secretKeyRef.name
          mapped_secret = secret_aliases.get(secret_name, None)

          if not mapped_secret and not secrets.IsValidK8sName(secret_name):
            mapped_secret = secret_name
            secret_name = self.renamed_secrets.get(mapped_secret, None)
            if not secret_name:
              secret_name, mapped_secret = _BuildReachableSecret(mapped_secret)
              self.renamed_secrets[mapped_secret] = secret_name
          new_env_vars_secrets[var.name] = _SecretEnvVar(
              key=var.valueFrom.secretKeyRef.key,
              name=secret_name,
              mapped_secret=mapped_secret)
      else:
        new_env_vars[var.name] = var.value
    replacements.update(
        _MergedEnvVars(self.env_vars, self.env_vars_secrets, new_env_vars,
                       new_env_vars_secrets))
    service_account_name = knative_service.spec.template.spec.serviceAccountName
    if service_account_name:
      replacements['credential'] = ServiceAccountSetting(
          name=service_account_name)

    image_name = container.image
    if image_name:
      replacements['image'] = image_name

    replacements.update(self._ResourceRequests(container))

    all_vols = {}
    for vol in knative_service.spec.template.spec.volumes:
      if vol.secret:
        all_vols[vol.name] = vol.secret
      else:
        raise exceptions.Error('Could not process volume "{}". Only volumes '
                               'from secrets are supported.'.format(vol.name))
    referenced_vols = []
    for vol in container.volumeMounts:
      if vol.name not in all_vols:
        raise exceptions.Error('Container referenced volume "{}" which was not '
                               'found.'.format(vol.name))
      referenced_vols.append(
          _SecretVolume.FromParsedYaml(vol, all_vols[vol.name], secret_aliases,
                                       self.renamed_secrets))
    replacements['volumes_secrets'] = referenced_vols

    return self.replace(**replacements)

  def _ParseServiceYaml(self, yaml_path):
    try:
      yaml_dict = yaml.load_path(yaml_path)
      message = messages_util.DictToMessageWithErrorCheck(
          yaml_dict, RUN_MESSAGES_MODULE.Service)
      knative_service = k8s_service.Service(message, RUN_MESSAGES_MODULE)
    except (yaml.Error, messages_util.DecodeError):
      raise _ParseError()
    return knative_service

  def _ResourceRequests(self, container):
    if not container.resources or not container.resources.limits:
      return {}
    ret = {}
    for prop in container.resources.limits.additionalProperties:
      if prop.key == 'cpu':
        ret['cpu'] = prop.value
      if prop.key == 'memory':
        ret['memory'] = prop.value
    return ret

  def WithAppYaml(self, yaml_path):
    """Overrides settings with app.yaml and returns a new Settings object.

    Args:
      yaml_path: Filename to read.

    Returns:
      New Settings object.

    Raises:
      ParseError: Input does not look like an app.yaml
    """
    try:
      service_config = app_engine_yaml_parsing.ServiceYamlInfo.FromFile(
          yaml_path)
    except yaml.Error:
      raise _ParseError()
    builder_url = _GaeBuilderPackagePath(service_config.parsed.runtime)
    builder = builders.BuildpackBuilder(
        builder=builder_url, trust=True, devmode=False)
    return self.replace(builder=builder)

  def WithArgs(self, args):
    """Overrides settings with args and returns a new Settings object."""

    replacements = {}
    for override_arg in [
        'local_port',
        'memory',
        'cpu',
        'namespace',
        'readiness_probe',
        'cloudsql_instances',
        'image',
        'service_name',
        'allow_secret_manager',
    ]:
      if args.IsKnownAndSpecified(override_arg):
        replacements[override_arg] = getattr(args, override_arg)

    if args.IsSpecified('application_default_credential'):
      replacements['credential'] = ApplicationDefaultCredentialSetting()
    elif args.IsSpecified('service_account'):
      replacements['credential'] = ServiceAccountSetting(
          name=args.service_account)
    context = self.context
    if args.source:
      context = os.path.abspath(args.source)
    replacements['context'] = context

    if getattr(args, 'no_skaffold_file', False):
      replacements['builder'] = None
    else:
      if args.IsKnownAndSpecified('builder'):
        replacements['builder'] = _BuilderFromArg(args.builder)
      elif args.IsKnownAndSpecified('dockerfile'):
        replacements['builder'] = builders.DockerfileBuilder(
            dockerfile=args.dockerfile)
      else:
        if isinstance(self.builder, builders.DockerfileBuilder):
          try:
            replacements['builder'] = self.builder
            replacements['builder'].Validate(context)
          except builders.InvalidLocationError:
            log.status.Print('No Dockerfile detected. '
                             'Using GCP buildpacks to build the container')
            replacements['builder'] = _BuilderFromArg(
                _DEFAULT_BUILDPACK_BUILDER)
    if getattr(args, 'env_vars', None):
      new_envs = args.env_vars
    else:
      new_envs = getattr(args, 'env_vars_file', {}) or {}

    new_env_secrets, new_sec_volumes = self._GetSecrets(
        getattr(args, 'secrets', {}) or {})
    replacements.update(
        _MergedEnvVars(self.env_vars, self.env_vars_secrets, new_envs,
                       new_env_secrets))

    replacements['volumes_secrets'] = _MergeSecretVolumes(
        self.volumes_secrets, new_sec_volumes)
    return self.replace(**replacements)

  def _GetSecrets(self, secrets_args):
    env_vars = {}  # name: _SecretEnevVar
    volumes = {}  # mount_path: _SecretVolume
    aliases = {}  # secret_path: local_name

    for key, secret in secrets_args.items():
      parts = secret.split(':')
      if len(parts) != 2:
        raise exceptions.Error('Expected secret to be of form '
                               '<secretName>:<version>, got {}'.format(secret))
      secret_name, version = parts
      mapped_secret = None
      if secret_name.startswith('projects/'):
        # it's a remote secret, need to alias it
        mapped_secret = secret_name
        if mapped_secret not in aliases:
          secret_name = (
              secret_name.lower().replace('_', '.')[:5] + '-' +
              str(uuid.uuid1()))
          aliases[mapped_secret] = secret_name
        else:
          secret_name = aliases[mapped_secret]
      elif not secrets.IsValidK8sName(secret_name):
        if secret_name in self.renamed_secrets:
          mapped_secret = secret_name
          secret_name = self.renamed_secrets[secret_name]
        else:
          secret_name, mapped_secret = _BuildReachableSecret(secret_name)

      if key.startswith('/'):  # it's a volume
        # the volume path is all but the last segment
        mount_path, filename = os.path.split(key)
        if mount_path not in volumes:
          volume = _SecretVolume(
              name=secret_name,
              mount_path=mount_path,
              secret_name=secret_name,
              items=[],
              mapped_secret=mapped_secret)
          volumes[mount_path] = volume
        else:
          volume = volumes[mount_path]
        volume.items.append(_SecretPath(key=version, path=filename))
      else:
        env_vars[key] = _SecretEnvVar(
            name=secret_name, key=version, mapped_secret=mapped_secret)
    return env_vars, list(volumes.values())

  def Build(self):
    """Validate and compute settings after all user inputs have been read."""
    if isinstance(self.builder, builders.DockerfileBuilder):
      self.builder.Validate(self.context)
    replacements = {}
    if self.image is None:
      replacements['image'] = _DefaultImageName(self.service_name)
    return self.replace(**replacements)


def _MergedEnvVars(env_vars, env_vars_secrets, new_env_vars,
                   new_env_vars_secrets):
  """Add the new env vars (both values and secrets) to the existing ones."""

  env_vars = env_vars.copy()
  env_vars_secrets = env_vars_secrets.copy()

  # Env Vars can either be values or secrets, but not both.
  # If the variable is set as both, error.
  conflicts = set(new_env_vars).intersection(new_env_vars_secrets)
  if conflicts:
    raise exceptions.Error('{} cannot be secret and literal'.format(conflicts))

  # If the user is overriding an existing env var that was a secret with a
  # literal or vice versa, make sure to remove the old value from the other
  # dict.
  for new_env, val in new_env_vars.items():
    if new_env in env_vars_secrets:
      del env_vars_secrets[new_env]
    env_vars[new_env] = val
  for new_secret, val in new_env_vars_secrets.items():
    if new_secret in env_vars:
      del env_vars[new_secret]
    env_vars_secrets[new_secret] = val
  return {'env_vars': env_vars, 'env_vars_secrets': env_vars_secrets}


def _MergeSecretVolumes(cur_vols, new_vols):
  """Combine the secret volumes from existing source with new volumes."""
  vol_map = {vol.mount_path: vol for vol in cur_vols}
  for new_vol in new_vols:
    if new_vol.mount_path not in vol_map:
      vol_map[new_vol.mount_path] = new_vol
    else:
      cur_vol = vol_map[new_vol.mount_path]
      if cur_vol.IsSameSecret(new_vol):
        cur_items = {item.path: item for item in cur_vol.items}
        new_items = {item.path: item for item in new_vol.items}
        cur_items.update(new_items)
        cur_vol.items[:] = list(cur_items.values())
      else:
        # replace the previous secret with the new secret
        vol_map[new_vol.mount_path] = new_vol
  return list(vol_map.values())


def AssembleSettings(args, release_track):
  """Layer the defaults, service.yaml/app.yaml values, and cmdline overrides."""

  # See flags.py for where we set help text based on release track, too.
  service_config_may_be_app_yaml = release_track == base.ReleaseTrack.ALPHA

  settings = Settings.Defaults()
  context_dir = getattr(args, 'source', None) or os.path.curdir
  service_config_arg = getattr(args, 'service_config', None)
  yaml_file = common.ChooseExistingServiceYaml(context_dir, service_config_arg)
  if yaml_file:
    try:
      settings = settings.WithServiceYaml(
          yaml_file, release_track == base.ReleaseTrack.ALPHA)
    except _ParseError:
      if service_config_may_be_app_yaml:
        try:
          settings = settings.WithAppYaml(yaml_file)
        except _ParseError:
          raise exceptions.Error(
              '%r is unreadable as a service.yaml or app.yaml file.' %
              yaml_file)
      else:
        raise exceptions.Error('%r is unreadable as a service.yaml file.' %
                               yaml_file)
  settings = settings.WithArgs(args)
  settings = settings.Build()
  return settings


def _DefaultImageName(service_name):
  """Computes a default image name."""
  project_name = properties.VALUES.core.project.Get()
  if project_name:
    image = 'gcr.io/{project}/{service}'.format(
        project=project_name, service=service_name)
  else:
    image = service_name

  # Image names cannot have upper case characters. If the image name is
  # autogenerated, then make sure the image name is lower case. If the
  # user enters an image name manually, it's OK to stop the user and tell
  # them about the illegal image name.
  image = image.lower()
  return image


def _BuilderFromArg(builder_arg):
  is_gcp_base_builder = _IsGcpBaseBuilder(builder_arg)
  return builders.BuildpackBuilder(
      builder=builder_arg,
      trust=is_gcp_base_builder,
      devmode=is_gcp_base_builder)


_POD_TEMPLATE = """
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {service}
  labels:
    service: {service}
spec:
  replicas: 1
  selector:
    matchLabels:
      app: {service}
  template:
    metadata:
      labels:
        app: {service}
    spec:
      containers: []
      terminationGracePeriodSeconds: 0
"""

_CONTAINER_TEMPLATE = """
name: {service}-container
image: {image}
env:
- name: PORT
  value: "8080"
ports:
- containerPort: 8080
"""

# The readiness probe container sits in an infinite wait loop and waits until
# port 8080 (1F90 in hex) is in use. This prevents the reload cycle from being
# considered "complete" until the developer's application binds to $PORT.
_READINESS_PROBE_CONTAINER_TEMPLATE = """
name: {service}-readiness-probe
image: gcr.io/gcp-runtimes/ubuntu_18_0_4:latest
command: ["sleep"]
args: ["infinity"]
readinessProbe:
  exec:
    command:
    - "grep"
    - ":1F90"
    - "/proc/net/tcp"
    - "/proc/net/tcp6"
  periodSeconds: 1
"""


def _CreateDeployment(service_name,
                      image_name,
                      memory_limit=None,
                      cpu_limit=None,
                      cpu_request=None,
                      readiness_probe=False):
  """Create a deployment specification for a service.

  Args:
    service_name: Name of the service.
    image_name: Image tag.
    memory_limit: Container memory limit.
    cpu_limit: Container cpu limit.
    cpu_request: Container cpu request.
    readiness_probe: If true, add a readiness probe.

  Returns:
    Dictionary object representing the deployment yaml, and the main container.
  """
  deployment = yaml.load(_POD_TEMPLATE.format(service=service_name))
  container = yaml.load(
      _CONTAINER_TEMPLATE.format(service=service_name, image=image_name))
  if memory_limit is not None:
    limits = yaml_helper.GetOrCreate(container, ('resources', 'limits'))
    limits['memory'] = memory_limit
  if cpu_limit is not None:
    limits = yaml_helper.GetOrCreate(container, ('resources', 'limits'))
    limits['cpu'] = six.text_type(cpu_limit)
  if cpu_request is not None:
    requests = yaml_helper.GetOrCreate(container, ('resources', 'requests'))
    requests['cpu'] = six.text_type(cpu_request)
  containers = yaml_helper.GetOrCreate(
      deployment, ('spec', 'template', 'spec', 'containers'), constructor=list)
  containers.append(container)

  if readiness_probe:
    readiness_container = yaml.load(
        _READINESS_PROBE_CONTAINER_TEMPLATE.format(service=service_name))
    containers.append(readiness_container)

  return deployment, container


_SERVICE_TEMPLATE = """
apiVersion: v1
kind: Service
metadata:
  name: {service}
spec:
  type: LoadBalancer
  selector:
    app: {service}
  ports:
  - protocol: TCP
    port: 8080
    targetPort: 8080
"""


def CreateService(service_name):
  """Create a service specification.

  Args:
    service_name: Name of the service.

  Returns:
    Dictionary objects representing the service yaml.
  """
  yaml_text = _SERVICE_TEMPLATE.format(service=service_name)
  return yaml.load(yaml_text)


def _AddEnvironmentVariables(container, env_vars):
  """Add environment variable settings to a container.

  Args:
    container: (dict) Container to edit.
    env_vars: (dict) Key value environment variable pairs.
  """
  env_list = yaml_helper.GetOrCreate(container, ('env',), constructor=list)
  invalid_keys = []
  for key, value in sorted(env_vars.items()):
    if not re.match(_C_IDENTIFIER, key):
      invalid_keys.append(six.ensure_str(key))
      continue
    env_list.append({'name': key, 'value': value})
  if invalid_keys:
    raise ValueError('Environment variable name must be a C_IDENTIFIER. '
                     'Invalid names: %r' % invalid_keys)


def _AddSecretEnvironmentVariables(container, env_vars_secrets):
  """Add environment variables from secrets to a container.

  Args:
    container: (dict) Container to edit.
    env_vars_secrets: (dict) Key value environment variable pairs. Values are
      dict with key/name keys in them.
  """
  env_list = yaml_helper.GetOrCreate(container, ('env',), constructor=list)
  for key, value in sorted(env_vars_secrets.items()):
    env_list.append({
        'name': key,
        'valueFrom': {
            'secretKeyRef': {
                'name': value.name,
                'key': value.key
            }
        }
    })


def CreateDevelopmentServiceAccount(service_account_email):
  """Creates a service account for local development.

  Args:
    service_account_email: Email of the service account.

  Returns:
    The resource name of the service account.
  """
  project_id = _GetServiceAccountProject(service_account_email)
  service_account_name = 'projects/{project}/serviceAccounts/{account}'.format(
      project=project_id, account=service_account_email)

  exists = _ServiceAccountExists(service_account_name)
  if _IsReservedServiceAccountName(service_account_email):
    if not exists:
      raise ValueError('%s cannot be created because it is a service '
                       'account name' % service_account_email)
    else:
      return service_account_name

  if not exists:
    account_id = _GetServiceAccountId(service_account_email)
    _CreateAccount('Serverless Local Development Service Account', account_id,
                   project_id)

    permission_msg = ('The project editor role allows the service account '
                      'to create, delete, and modify most resources in the '
                      'project.')
    prompt_string = (
        'Add project editor role to {}?'.format(service_account_email))
    # Make the service account an editor on the project
    if console_io.PromptContinue(
        message=permission_msg, prompt_string=prompt_string):
      _AddBinding(project_id, 'serviceAccount:' + service_account_email,
                  'roles/editor')

  return service_account_name


# Regular expression for parsing a service account email address.
# Format is [id]@[project].iam.gserviceaccount.com
_PROJECT_SERVICE_ACCOUNT_RE = re.compile(
    r'(?P<id>[^@]+)@(?P<project>[^\.]+).iam.gserviceaccount.com')

# Regular expression for parsing a compute service account email address.
# Format is [project-id]-compute@developer.gserviceaccount.com
_APPENGINE_SERVICE_ACCOUNT = re.compile(
    r'(?P<project_id>[^\.]+).google.com@appspot.gserviceaccount.com')

# Regular expression for parsing a compute service account email address.
# Format is [project-number]-compute@developer.gserviceaccount.com
_COMPUTE_SERVICE_ACCOUNT = re.compile(
    r'(?P<project_number>\d+)-compute@developer.gserviceaccount.com')


def _GetServiceAccountProject(service_account_email):
  """Get the project id from a service account email.

  Args:
    service_account_email: (str) Email address of service account.

  Returns:
    The project id of the project to which the service account belongs.
  """
  matcher = _PROJECT_SERVICE_ACCOUNT_RE.match(service_account_email)
  if matcher:
    return matcher.group('project')

  matcher = _APPENGINE_SERVICE_ACCOUNT.match(service_account_email)
  if matcher:
    return matcher.group('project_id')

  matcher = _COMPUTE_SERVICE_ACCOUNT.match(service_account_email)
  if matcher:
    return _ProjectNumberToId(matcher.group('project_number'))

  raise ValueError(service_account_email +
                   ' is not a valid service account address')


_SERVICE_ACCOUNT_RE = re.compile(r'(?P<id>[^@]+)@.*\.gserviceaccount\.com')


def _GetServiceAccountId(service_account_email):
  matcher = _SERVICE_ACCOUNT_RE.match(service_account_email)
  if not matcher:
    raise ValueError(service_account_email +
                     ' is not a valid service account address')
  return matcher.group('id')


def _ProjectNumberToId(project_number):
  """Coverts project number to project id.

  Args:
    project_number: (str) The project number as a string.

  Returns:
    The project id.
  """
  resource_manager = apis.GetClientInstance('cloudresourcemanager', 'v1')
  req = CRM_MESSAGE_MODULE.CloudresourcemanagerProjectsGetRequest(
      projectId=project_number)
  project = resource_manager.projects.Get(req)
  return six.ensure_text(project.projectId)


def _IsReservedServiceAccountName(service_account_email):
  return (_APPENGINE_SERVICE_ACCOUNT.match(service_account_email) or
          _COMPUTE_SERVICE_ACCOUNT.match(service_account_email))


def _ServiceAccountExists(service_account_name):
  """Tests if service account email.

  Args:
    service_account_name: (str) Service account resource name.

  Returns:
    True if the service account exists.
  """
  service = apis.GetClientInstance('iam', 'v1')
  try:
    request = IAM_MESSAGE_MODULE.IamProjectsServiceAccountsGetRequest(
        name=service_account_name)
    service.projects_serviceAccounts.Get(request)
    return True
  except apitools_exceptions.HttpNotFoundError:
    return False


def _CreateAccount(display_name, account_id, project):
  """Create an account if it does not already exist.

  Args:
    display_name: (str) Display name.
    account_id: (str) User account id.
    project: (str) Project name.
  """
  service = apis.GetClientInstance('iam', 'v1')
  try:
    service_account_msg = IAM_MESSAGE_MODULE.ServiceAccount(
        displayName=display_name)
    request = IAM_MESSAGE_MODULE.CreateServiceAccountRequest(
        accountId=account_id, serviceAccount=service_account_msg)
    service.projects_serviceAccounts.Create(
        IAM_MESSAGE_MODULE.IamProjectsServiceAccountsCreateRequest(
            name='projects/' + project, createServiceAccountRequest=request))
  except apitools_exceptions.HttpConflictError:
    # If account already exists, we can ignore the error
    pass


def _AddBinding(project, account, role):
  """Adds a binding.

  Args:
    project: (str) Project name.
    account: (str) User account.
    role: (str) Role.
  """
  crm_client = apis.GetClientInstance('cloudresourcemanager', 'v1')
  policy = crm_client.projects.GetIamPolicy(
      CRM_MESSAGE_MODULE.CloudresourcemanagerProjectsGetIamPolicyRequest(
          resource=project))

  if not iam_util.BindingInPolicy(policy, account, role):
    iam_util.AddBindingToIamPolicy(CRM_MESSAGE_MODULE.Binding, policy, account,
                                   role)
    req = CRM_MESSAGE_MODULE.CloudresourcemanagerProjectsSetIamPolicyRequest(
        resource=project,
        setIamPolicyRequest=CRM_MESSAGE_MODULE.SetIamPolicyRequest(
            policy=policy))
    crm_client.projects.SetIamPolicy(req)


class KubeConfigGenerator(object):
  """The base code generator with default return values.

  Subclasses may override any of the member methods.
  """

  def CreateConfigs(self):
    """Create top level kubernetes configs.

    Returns:
      List of kubernetes configuration yamls encoded as dictionaries.
    """
    return []

  def ModifyDeployment(self, deployment):
    """Modify a deployment.

    Subclasses that override this method should use this method for adding
    or deleting resources (e.g. containers, volumes, metadata) to the
    deployment.

    Args:
      deployment: (dict) Deployment yaml in dictionary form.
    """

  def ModifyContainer(self, container):
    """Modify a container.

    Subclasses that override this method should use this method for adding,
    deleting, or modifying any of the yaml for a container.

    Args:
      container: (dict) Container yaml in dictionary form.
    """


class AppContainerGenerator(KubeConfigGenerator):
  """Generate deployment and service for a developer's app."""

  def __init__(self,
               service_name,
               image_name,
               env_vars=None,
               env_vars_secrets=None,
               memory_limit=None,
               cpu_limit=None,
               cpu_request=None,
               readiness_probe=False):
    self._service_name = service_name
    self._image_name = image_name
    self._env_vars = env_vars
    self._env_vars_secrets = env_vars_secrets
    self._memory_limit = memory_limit
    self._cpu_limit = cpu_limit
    self._cpu_request = cpu_request
    self._readiness_probe = readiness_probe

  def CreateConfigs(self):
    deployment, container = _CreateDeployment(
        self._service_name, self._image_name, self._memory_limit,
        self._cpu_limit, self._cpu_request, self._readiness_probe)
    default_env_vars = {
        'K_SERVICE': self._service_name,
        'K_CONFIGURATION': 'dev',
        'K_REVISION': 'dev-0001',
    }

    _AddEnvironmentVariables(container, default_env_vars)
    if self._env_vars:
      _AddEnvironmentVariables(container, self._env_vars)
    if self._env_vars_secrets:
      _AddSecretEnvironmentVariables(container, self._env_vars_secrets)
    service = CreateService(self._service_name)
    return [deployment, service]


class SecretInfo(object):
  """Information about a generated secret."""

  def __init__(self):
    self.secret_name = 'local-development-credential'
    self.path = ('/etc/' + self.secret_name.replace('-', '_') +
                 '/local_development_service_account.json')


def GetServiceAccountSecret(account_name):
  """Get a service account secret file as text.

  Args:
    account_name: (str) Name of the service account.

  Returns:
    Context of the service account secret as a string.
  """
  service_account = CreateDevelopmentServiceAccount(account_name)
  return CreateServiceAccountKey(service_account)


def GetUserCredential():
  """Get a copy of the application default credential for a user.

  Returns:
    Text version of the user's application default credential.
  """
  auth_util.AssertADCExists()
  return json.dumps(auth_util.GetADCAsJson())


class CredentialGenerator(KubeConfigGenerator):
  """Configures service account secret."""

  def __init__(self, credential_fetcher):
    self._credential_fetcher = credential_fetcher

  def GetInfo(self):
    return SecretInfo()

  def CreateConfigs(self):
    """Create a secret."""
    return [LocalDevelopmentSecretSpec(self._credential_fetcher())]

  def ModifyDeployment(self, deployment):
    """Add a secret volume to a deployment."""
    secret_info = self.GetInfo()
    volumes = yaml_helper.GetOrCreate(deployment,
                                      ('spec', 'template', 'spec', 'volumes'),
                                      list)
    _AddSecretVolume(volumes, secret_info.secret_name)

  def ModifyContainer(self, container):
    """Add volume mount and set application credential environment variable."""
    secret_info = self.GetInfo()
    mounts = yaml_helper.GetOrCreate(container, ('volumeMounts',), list)
    _AddSecretVolumeMount(mounts, secret_info.secret_name)
    envs = yaml_helper.GetOrCreate(container, ('env',), list)
    _AddSecretEnvVar(envs, secret_info.path)


_CLOUD_PROXY_CONTAINER_NAME = 'cloud-sql-proxy'


class CloudSqlProxyGenerator(KubeConfigGenerator):
  """Generate kubernetes configurations for a Cloud SQL proxy connection."""

  def __init__(self, instance_names, secret_info):
    self._instance_names = instance_names
    self._secret_info = secret_info

  def ModifyDeployment(self, deployment):
    """Add sidecar container and empty volume for unix socket."""
    volumes = yaml_helper.GetOrCreate(
        deployment, ('spec', 'template', 'spec', 'volumes'), constructor=list)
    volumes.append({'name': 'cloudsql', 'emptyDir': {}})

    containers = yaml_helper.GetOrCreate(
        deployment, ('spec', 'template', 'spec', 'containers'),
        constructor=list)
    containers.append(
        _CreateCloudSqlProxyContainer(self._instance_names,
                                      self._secret_info.path))

  def ModifyContainer(self, container):
    """Add volume mount to continer.

    This method will not modify the CloudSql proxy container.

    Args:
      container: (dict) Container yaml as a dict.
    """
    if container['name'] == _CLOUD_PROXY_CONTAINER_NAME:
      return
    volume_mounts = yaml_helper.GetOrCreate(
        container, ('volumeMounts',), constructor=list)
    volume_mounts.append({
        'name': 'cloudsql',
        'mountPath': '/cloudsql',
        'readOnly': True
    })


class SecretsNotAllowedError(exceptions.Error):
  """Error thrown when the deploy is not allowed to access secret manager."""
  pass


class SecretsGenerator(KubeConfigGenerator):
  """Generate kubernetes secrets for referenced secrets."""

  def __init__(self,
               service_name,
               env_secrets,
               secret_volumes,
               namespace,
               allow_secret_manager=_FLAG_UNSPECIFIED):
    self.project_name = properties.VALUES.core.project.Get()
    self.service_name = service_name
    self.secret_volumes = secret_volumes
    self.all_secrets = {}

    secrets_builder = collections.defaultdict(set)
    for _, secret in env_secrets.items():
      secrets_builder[(secret.name, secret.mapped_secret)].add(secret.key)

    for secret in secret_volumes:
      if not secret.items:
        # This is currently unsupported for secrets pulled from secret manager.
        secrets_builder[(secret.secret_name, secret.mapped_secret)].add(
            secrets_mapping.SpecialVersion.MOUNT_ALL)
      else:
        for item in secret.items:
          secrets_builder[(secret.secret_name,
                           secret.mapped_secret)].add(item.key)

    for (secret_name, mapped_secret), versions in secrets_builder.items():
      self.all_secrets[secret_name] = secrets.SecretManagerSecret(
          name=secret_name,
          versions=frozenset(versions),
          mapped_secret=mapped_secret)
    self.namespace = namespace
    self.allow_secret_manager = allow_secret_manager

  def CreateConfigs(self):
    if not self.all_secrets:
      return []
    # If secret manager was unspecified, prompt to continue
    if self.allow_secret_manager is _FLAG_UNSPECIFIED:
      requested = []
      for key, sec in self.all_secrets.items():
        if sec.mapped_secret:
          requested.append(sec.mapped_secret)
        else:
          requested.append(key)
      secrets_msg = ('This config references secrets stored in secret manager.'
                     ' Continuing will fetch the secret values and download '
                     'the secrets to your local machine.')
      prompt_string = (
          'Fetch secrets from secret manager for {}?'.format(requested))
      # Make the service account an editor on the project
      if console_io.CanPrompt() and console_io.PromptContinue(
          message=secrets_msg, prompt_string=prompt_string):
        log.status.Print(
            'You can skip this message in the future by passing the '
            'flag --allow-secret-manager')
        self.allow_secret_manager = True

    if (not self.allow_secret_manager or
        self.allow_secret_manager is _FLAG_UNSPECIFIED):
      raise SecretsNotAllowedError(
          'Config requires secrets but access to secret manager was not '
          'allowed. Replace secrets with environment variables or '
          'allow secret manager with --allow-secret-manager to proceed.')
    return secrets.BuildSecrets(self.project_name,
                                set(self.all_secrets.values()), self.namespace)

  def ModifyDeployment(self, deployment):
    # There's only one deployment for now, but let's make sure it's the right
    # one in case there's more later
    if deployment['metadata']['name'] != self.service_name:
      return
    # If there's no volumes, don't do anything
    if not self.secret_volumes:
      return
    volumes = yaml_helper.GetOrCreate(deployment,
                                      ('spec', 'template', 'spec', 'volumes'),
                                      list)
    for volume in self.secret_volumes:
      _AddSecretVolumeByName(volumes, volume.secret_name, volume.name,
                             volume.items)

  def ModifyContainer(self, container):
    if container['name'] != '{}-container'.format(self.service_name):
      return
    if not self.secret_volumes:
      return
    mounts = yaml_helper.GetOrCreate(container, ('volumeMounts',), list)
    for volume in self.secret_volumes:
      _AddVolumeMount(mounts, volume.name, volume.mount_path)


_CLOUD_SQL_PROXY_VERSION = '1.16'


def _CreateCloudSqlProxyContainer(instances, secret_path):
  return {
      'name': _CLOUD_PROXY_CONTAINER_NAME,
      'image': 'gcr.io/cloudsql-docker/gce-proxy:' + _CLOUD_SQL_PROXY_VERSION,
      'command': ['/cloud_sql_proxy'],
      'args': [
          '-dir=/cloudsql', '-instances=' + ','.join(instances),
          '-credential_file=' + secret_path
      ],
      'volumeMounts': [{
          'name': 'cloudsql',
          'mountPath': '/cloudsql',
      }]
  }


_SECRET_TEMPLATE = """
apiVersion: v1
kind: Secret
metadata:
  name: local-development-credential
type: Opaque
"""


def CreateServiceAccountKey(service_account_name):
  """Create a service account key.

  Args:
    service_account_name: Name of service acccount.

  Returns:
    The contents of the generated private key file as a string.
  """
  default_credential_path = os.path.join(
      config.Paths().global_config_dir,
      _Utf8ToBase64(service_account_name) + '.json')
  credential_file_path = encoding.GetEncodedValue(os.environ,
                                                  'LOCAL_CREDENTIAL_PATH',
                                                  default_credential_path)
  if os.path.exists(credential_file_path):
    return files.ReadFileContents(credential_file_path)

  warning_msg = ('Creating a user-managed service account key for '
                 '{service_account_name}. This service account key will be '
                 'the default credential pointed to by '
                 'GOOGLE_APPLICATION_CREDENTIALS in the local development '
                 'environment. The user is responsible for the storage,'
                 'rotation, and deletion of this key. A copy of this key will '
                 'be stored at {local_key_path}.\n'
                 'Only use service accounts from a test project. Do not use '
                 'service accounts from a production project.').format(
                     service_account_name=service_account_name,
                     local_key_path=credential_file_path)
  console_io.PromptContinue(
      message=warning_msg, prompt_string='Continue?', cancel_on_no=True)

  service = apis.GetClientInstance('iam', 'v1')
  message_module = service.MESSAGES_MODULE

  create_key_request = (
      message_module.IamProjectsServiceAccountsKeysCreateRequest(
          name=service_account_name,
          createServiceAccountKeyRequest=message_module
          .CreateServiceAccountKeyRequest(
              privateKeyType=message_module.CreateServiceAccountKeyRequest
              .PrivateKeyTypeValueValuesEnum.TYPE_GOOGLE_CREDENTIALS_FILE)))
  key = service.projects_serviceAccounts_keys.Create(create_key_request)

  files.WriteFileContents(credential_file_path, key.privateKeyData)

  return six.ensure_text(key.privateKeyData)


def LocalDevelopmentSecretSpec(key):
  """Create a kubernetes yaml spec for a secret.

  Args:
    key: (str) The private key as a JSON string.

  Returns:
    Dictionary representing yaml dictionary.
  """
  yaml_config = yaml.load(_SECRET_TEMPLATE)
  yaml_config['data'] = {
      'local_development_service_account.json': _Utf8ToBase64(key)
  }
  return yaml_config


def _AddSecretVolume(volumes, secret_name):
  _AddSecretVolumeByName(
      volumes, secret_name=secret_name, volume_name=secret_name)


def _AddSecretVolumeByName(volumes, secret_name, volume_name, items=None):
  """Add a secret volume to a list of volumes.

  Args:
    volumes: (list[dict]) List of volume specifications.
    secret_name: (str) Name of the secret.
    volume_name: (str) Name of the volume to add.
    items: (list[_SecretPath]) Optional list of SecretPaths to map on the
      filesystem.
  """
  if any(volume['name'] == volume_name for volume in volumes):
    return
  items_lst = [
      RUN_MESSAGES_MODULE.KeyToPath(key=secpath.key, path=secpath.path)
      for secpath in (items or [])
  ]
  volumes.append(
      encoding_helper.MessageToDict(
          RUN_MESSAGES_MODULE.Volume(
              name=volume_name,
              secret=RUN_MESSAGES_MODULE.SecretVolumeSource(
                  secretName=secret_name, items=items_lst))))


def _AddSecretVolumeMount(mounts, secret_name):
  """Add a secret volume mount.

  Args:
    mounts: (list[dict]) List of volume mount dictionaries.
    secret_name: (str) Name of the secret.
  """
  _AddVolumeMount(
      mounts,
      mount_name=secret_name,
      mount_path='/etc/' + secret_name.replace('-', '_'))


def _AddVolumeMount(mounts, mount_name, mount_path):
  if any(mount_name == mount['name'] for mount in mounts):
    return
  mount = RUN_MESSAGES_MODULE.VolumeMount(
      name=mount_name, mountPath=mount_path, readOnly=True)
  mounts.append(encoding_helper.MessageToDict(mount))


def _AddSecretEnvVar(envs, path):
  """Adds a environmental variable that points to the secret file.

  Add a environment varible where GOOGLE_APPLICATION_CREDENTIALS is the name
  and the path to the secret file is the value.

  Args:
    envs: (list[dict]) List of dictionaries with a name entry and value entry.
    path: (str) Path to secret.
  """
  if 'GOOGLE_APPLICATION_CREDENTIALS' not in (var['name'] for var in envs):
    envs.append({'name': 'GOOGLE_APPLICATION_CREDENTIALS', 'value': path})


def _Utf8ToBase64(s):
  """Encode a utf-8 string as a base 64 string."""
  return six.ensure_text(base64.b64encode(six.ensure_binary(s)))


def _BuildReachableSecret(name):
  new_name = name.replace('_', '.').lower()
  u = str(uuid.uuid1())
  # make sure name is sufficiently small to fit the UUID
  new_name = new_name[:253 - len(u)] + u
  mapped = None
  if new_name != name:
    mapped = name
  return new_name, mapped