File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/run/config_changes.py
# -*- coding: utf-8 -*- #
# Copyright 2018 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Class for representing various changes to a Configuration."""
from __future__ import absolute_import
from __future__ import annotations
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import abc
import argparse
import collections
from collections.abc import Collection, Container, Iterable, Mapping, MutableMapping
import copy
import dataclasses
import itertools
import json
import types
from typing import Any, ClassVar
from googlecloudsdk.api_lib.run import container_resource
from googlecloudsdk.api_lib.run import job
from googlecloudsdk.api_lib.run import k8s_object
from googlecloudsdk.api_lib.run import revision
from googlecloudsdk.api_lib.run import service
from googlecloudsdk.api_lib.run import worker_pool
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.run import exceptions
from googlecloudsdk.command_lib.run import name_generator
from googlecloudsdk.command_lib.run import platforms
from googlecloudsdk.command_lib.run import secrets_mapping
from googlecloudsdk.command_lib.run import volumes
from googlecloudsdk.command_lib.util.args import labels_util
from googlecloudsdk.command_lib.util.args import repeated
from googlecloudsdk.generated_clients.apis.run.v1 import run_v1_messages
import six
class ConfigChanger(six.with_metaclass(abc.ABCMeta, object)):
"""An abstract class representing configuration changes."""
@property
@abc.abstractmethod
def adjusts_template(self):
"""Returns True if any template-level changes should be made."""
@abc.abstractmethod
def Adjust(self, resource):
"""Adjust the given Service configuration.
Args:
resource: the k8s_object to adjust.
Returns:
A k8s_object that reflects applying the requested update.
May be resource after a mutation or a different object.
"""
class NonTemplateConfigChanger(ConfigChanger):
"""An abstract class representing non-template configuration changes."""
@property
def adjusts_template(self):
return False
class TemplateConfigChanger(ConfigChanger):
"""An abstract class representing template configuration changes."""
@property
def adjusts_template(self):
return True
@dataclasses.dataclass(frozen=True)
class ContainerConfigChanger(TemplateConfigChanger):
"""An abstract class representing container configuration changes.
Attributes:
container_name: Name of the container to modify. If None the primary
container is modified.
"""
container_name: str | None = None
@abc.abstractmethod
def AdjustContainer(
self,
container: container_resource.Container,
messages_mod: types.ModuleType,
):
"""Mutate the given container.
This method is called by this class's Adjust method and should apply the
desired changes directly to container.
Args:
container: the container to adjust.
messages_mod: Run v1 messages module.
"""
def Adjust(self, resource: container_resource.ContainerResource):
"""Returns a modified resource.
Adjusts resource by applying changes to the container specified by
self.container_name if present or the primary container otherwise. Calls
AdjustContainer to apply changes to the selected container.
Args:
resource: The resoure to modify.
"""
if self.container_name is not None:
container = resource.template.containers[self.container_name]
else:
container = resource.template.container
self.AdjustContainer(container, resource.MessagesModule())
return resource
def WithChanges(resource, changes):
"""Apply ConfigChangers to resource.
It's undefined whether the input resource is modified.
Args:
resource: KubernetesObject, probably a Service.
changes: List of ConfigChangers.
Returns:
Changed resource.
"""
for config_change in changes:
resource = config_change.Adjust(resource)
return resource
def AdjustsTemplate(changes):
"""Returns True if there's any template-level changes."""
return any([c.adjusts_template for c in changes])
@dataclasses.dataclass(frozen=True)
class LabelChanges(ConfigChanger):
"""Represents the user intent to modify metadata labels.
Attributes:
diff: Label diff to apply.
copy_to_revision: A boolean indicating that label changes should be copied
to the resource's template.
"""
_LABELS_NOT_ALLOWED_IN_REVISION: ClassVar[frozenset[str]] = frozenset(
[service.ENDPOINT_VISIBILITY]
)
diff: labels_util.Diff
copy_to_revision: bool = True
@property
def adjusts_template(self):
return self.copy_to_revision
def Adjust(self, resource):
# Currently assumes all "system"-owned labels are applied by the control
# plane and it's ok for us to clear them on the client.
update_result = self.diff.Apply(
k8s_object.Meta(resource.MessagesModule()).LabelsValue,
resource.metadata.labels,
)
maybe_new_labels = update_result.GetOrNone()
if maybe_new_labels:
resource.metadata.labels = maybe_new_labels
# For job, resource.template points to task template which has no
# metadata. Use job specific execution_template instead.
template = (
resource.execution_template
if hasattr(resource, 'execution_template')
else resource.template
)
if self.copy_to_revision and hasattr(template, 'labels'):
# Service labels are the source of truth and *overwrite* revision labels
# See go/run-labels-prd for deets.
# However, we need to preserve the nonce if there is one.
nonce = template.labels.get(revision.NONCE_LABEL)
template.metadata.labels = copy.deepcopy(maybe_new_labels)
for label_to_remove in self._LABELS_NOT_ALLOWED_IN_REVISION:
if label_to_remove in template.labels:
del template.labels[label_to_remove]
if nonce:
template.labels[revision.NONCE_LABEL] = nonce
return resource
class JobNonceChange(TemplateConfigChanger):
"""Adds a new nonce to the job template, for forcing an image pull."""
def Adjust(self, resource):
resource.execution_template.labels[job.NONCE_LABEL] = (
name_generator.GenerateName(3, separator='_')
)
return resource
@dataclasses.dataclass(frozen=True)
class ReplaceJobChange(NonTemplateConfigChanger):
"""Represents the user intent to replace the job.
Attributes:
new_job: New job that will replace the existing job.
"""
new_job: job.Job
def Adjust(self, resource):
"""Returns a replacement for resource.
The returned job is the job provided to the constructor. If
resource.metadata.resourceVersion is not empty, has metadata.resourceVersion
of returned job set to this value.
Args:
resource: job.Job, The job to adjust.
"""
if resource.metadata.resourceVersion:
self.new_job.metadata.resourceVersion = resource.metadata.resourceVersion
return self.new_job
@dataclasses.dataclass(frozen=True)
class ReplaceServiceChange(NonTemplateConfigChanger):
"""Represents the user intent to replace the service.
Attributes:
new_service: New service that will replace the existing service.
"""
new_service: service.Service
def Adjust(self, resource):
"""Returns a replacement for resource.
The returned service is the service provided to the constructor. If
resource.metadata.resourceVersion is not empty, has metadata.resourceVersion
of returned service set to this value.
Args:
resource: service.Service, The service to adjust.
"""
if resource.metadata.resourceVersion:
self.new_service.metadata.resourceVersion = (
resource.metadata.resourceVersion
)
# Knative will complain if you try to edit (incl remove) serving annots.
# So replicate them here.
for k, v in resource.annotations.items():
if k.startswith(k8s_object.SERVING_GROUP):
self.new_service.annotations[k] = v
return self.new_service
@dataclasses.dataclass(frozen=True)
class ReplaceWorkerPoolChange(NonTemplateConfigChanger):
"""Represents the user intent to replace the worker pool.
Attributes:
new_worker_pool: New worker pool that will replace the existing worker pool.
"""
new_worker_pool: worker_pool.WorkerPool
def Adjust(self, resource):
"""Returns a replacement for resource.
The returned worker pool is the worker pool provided to the constructor. If
resource.metadata.resourceVersion is not empty, has metadata.resourceVersion
of returned worker pool set to this value.
Args:
resource: worker_pool.WorkerPool, The worker pool to adjust.
"""
if resource.metadata.resourceVersion:
self.new_worker_pool.metadata.resourceVersion = (
resource.metadata.resourceVersion
)
return self.new_worker_pool
@dataclasses.dataclass(frozen=True, init=False)
class EndpointVisibilityChange(LabelChanges):
"""Represents the user intent to modify the endpoint visibility.
Only applies to Cloud Run for Anthos.
"""
endpoint_visibility: dataclasses.InitVar[bool]
def __init__(self, endpoint_visibility):
"""Determine label changes for modifying endpoint visibility.
Args:
endpoint_visibility: bool, True if Cloud Run on GKE service should only be
addressable from within the cluster. False if it should be publicly
addressable.
"""
if endpoint_visibility:
diff = labels_util.Diff(
additions={service.ENDPOINT_VISIBILITY: service.CLUSTER_LOCAL}
)
else:
diff = labels_util.Diff(subtractions=[service.ENDPOINT_VISIBILITY])
# Don't copy this label to the revision because it's not supported there.
# See b/154664962.
super().__init__(diff, False)
@dataclasses.dataclass(frozen=True)
class SetAnnotationChange(NonTemplateConfigChanger):
"""Represents the user intent to set an annotation.
Attributes:
key: Annotation to set.
value: Annotation value to set.
"""
key: str
value: str
def Adjust(self, resource):
resource.annotations[self.key] = self.value
return resource
@dataclasses.dataclass(frozen=True)
class DeleteAnnotationChange(NonTemplateConfigChanger):
"""Represents the user intent to delete an annotation.
Attributes:
key: Annotation to delete.
"""
key: str
def Adjust(self, resource):
annotations = resource.annotations
if self.key in annotations:
del annotations[self.key]
return resource
@dataclasses.dataclass(frozen=True)
class SetTemplateAnnotationChange(TemplateConfigChanger):
"""Represents the user intent to set a template annotation.
Attributes:
key: Template annotation to set.
value: Annotation value to set.
"""
key: str
value: str
def Adjust(self, resource):
resource.template.annotations[self.key] = self.value
return resource
@dataclasses.dataclass(frozen=True)
class DeleteTemplateAnnotationChange(TemplateConfigChanger):
"""Represents the user intent to delete a template annotation.
Attributes:
key: Template annotation to delete.
"""
key: str
def Adjust(self, resource):
annotations = resource.template.annotations
if self.key in annotations:
del annotations[self.key]
return resource
@dataclasses.dataclass(frozen=True)
class BaseImagesAnnotationChange(TemplateConfigChanger):
"""Represents the user intent to update the 'base-images' template annotation.
The value of the annotation is a string representation of a json map of
container_name -> base_image_url. E.g.: '{"mycontainer":"my_base_image_url"}'.
Attributes:
updates: {container:url} map of values that need to be added/updated
deletes: List of containers whose base image url needs to be deleted.
"""
updates: dict[str, str] = dataclasses.field(default_factory=dict)
deletes: list[str] = dataclasses.field(default_factory=list)
def _mergeBaseImageUrls(
self,
resource: revision.Revision,
existing_base_image_urls: dict[str, str],
updates: dict[str, str],
deletes: list[str],
):
if deletes:
for container in deletes:
if container in existing_base_image_urls:
del existing_base_image_urls[container]
if updates:
for container, url in updates.items():
existing_base_image_urls[container] = url
return self._constructBaseImageUrls(resource, existing_base_image_urls)
def _constructBaseImageUrls(
self, resource: revision.Revision, urls: dict[str, str]
):
containers = frozenset(
[x or '' for x in resource.template.containers.keys()]
)
base_images_str = ', '.join(
f'"{key}":"{value}"' for key, value in urls.items() if key in containers
)
return '{' + base_images_str + '}' if base_images_str else ''
def Adjust(self, resource: revision.Revision):
"""Updates the revision to use automatic base image updates."""
annotations = resource.template.annotations
existing_value = annotations.get(revision.BASE_IMAGES_ANNOTATION, '')
if existing_value:
existing_base_image_urls = json.loads(existing_value)
new_value = self._mergeBaseImageUrls(
resource, existing_base_image_urls, self.updates, self.deletes
)
else:
new_value = self._constructBaseImageUrls(resource, self.updates)
if new_value:
resource.template.annotations[revision.BASE_IMAGES_ANNOTATION] = new_value
resource.template.spec.runtimeClassName = (
revision.BASE_IMAGE_UPDATE_RUNTIME_CLASS_NAME
)
elif revision.BASE_IMAGES_ANNOTATION in annotations:
del resource.template.annotations[revision.BASE_IMAGES_ANNOTATION]
if (
resource.template.spec.runtimeClassName
== revision.BASE_IMAGE_UPDATE_RUNTIME_CLASS_NAME
):
resource.template.spec.runtimeClassName = ''
return resource
@dataclasses.dataclass(frozen=True)
class SourcesAnnotationChange(TemplateConfigChanger):
"""Represents the user intent to update the 'sources' template annotation.
The value of the annotation is a string representation of a json map of
container_name -> GCS objects. E.g.: '{"foo":"gs://my-bucket/my-object"}'.
Attributes:
updates: {container:url} map of values that need to be added/updated
deletes: List of containers whose source needs to be deleted.
"""
updates: dict[str, str] = dataclasses.field(default_factory=dict)
deletes: list[str] = dataclasses.field(default_factory=list)
def _mergeSources(
self,
resource: revision.Revision,
existing_sources: dict[str, str],
updates: dict[str, str],
deletes: list[str],
):
if deletes:
for container in deletes:
if container in existing_sources:
del existing_sources[container]
if updates:
for container, url in updates.items():
existing_sources[container] = url
return self._constructSources(resource, existing_sources)
def _constructSources(
self, resource: revision.Revision, urls: dict[str, str]
):
containers = frozenset(
[x or '' for x in resource.template.containers.keys()]
)
return json.dumps(
{x: y for x, y in urls.items() if x in containers},
separators=(',', ':'),
)
def Adjust(self, resource: revision.Revision):
"""Updates the revision to use zip deploys."""
annotations = resource.template.annotations
existing_value = annotations.get(revision.SOURCES_ANNOTATION, '')
if existing_value:
existing_sources = json.loads(existing_value)
new_value = self._mergeSources(
resource, existing_sources, self.updates, self.deletes
)
else:
new_value = self._constructSources(resource, self.updates)
if new_value and new_value != '{}':
resource.template.annotations[revision.SOURCES_ANNOTATION] = new_value
elif revision.SOURCES_ANNOTATION in annotations:
del resource.template.annotations[revision.SOURCES_ANNOTATION]
return resource
@dataclasses.dataclass(frozen=True)
class IngressContainerBaseImagesAnnotationChange(BaseImagesAnnotationChange):
"""Represents the user intent to update the 'base-images' template annotation.
The value of the annotation is a string representation of a json map of
container_name -> base_image_url. E.g.: '{"mycontainer":"my_base_image_url"}'.
This class changes the base image annotation for the default container, which
is either the container in a service with one container or the one with a port
set in a service with multiple containers.
Attributes:
base_image: url of the base image for the default container or None
"""
base_image: str | None = None
def Adjust(self, resource: revision.Revision):
"""Updates the revision to use automatic base image updates."""
if self.base_image:
self.updates[resource.template.container.name or ''] = self.base_image
else:
self.deletes.append(resource.template.container.name or '')
return super().Adjust(resource)
@dataclasses.dataclass(frozen=True)
class SetLaunchStageAnnotationChange(NonTemplateConfigChanger):
"""Sets launch stage annotation on a resource.
Attributes:
launch_stage: The launch stage to set.
"""
launch_stage: base.ReleaseTrack
def Adjust(self, resource):
if self.launch_stage == base.ReleaseTrack.GA:
return resource
else:
resource.annotations[k8s_object.LAUNCH_STAGE_ANNOTATION] = (
self.launch_stage.id
)
return resource
@dataclasses.dataclass(frozen=True)
class SetRegionsAnnotationChange(NonTemplateConfigChanger):
"""Sets regions annotation on a resource.
Attributes:
regions: A comma-separated list of regions.
"""
regions: str
def Adjust(self, resource):
resource.annotations[k8s_object.MULTI_REGION_REGIONS_ANNOTATION] = (
self.regions
)
return resource
@dataclasses.dataclass(frozen=True)
class RegionsChangeAnnotationChange(NonTemplateConfigChanger):
"""Adds or removes regions annotation on an existing service.
Attributes:
existing: the existing Service.
to_add: A comma-separated list of regions to add to existing.
to_remove: A comma-separated list of regions to remove from existing.
"""
to_add: str
to_remove: str
def Adjust(self, resource):
final_list = self.GetFinalList(resource)
resource.annotations[k8s_object.MULTI_REGION_REGIONS_ANNOTATION] = ','.join(
final_list
)
return resource
def GetFinalList(self, resource):
"""Returns the final list of regions after applying the changes."""
annotation = (
resource.annotations.get(k8s_object.MULTI_REGION_REGIONS_ANNOTATION)
or None
)
existing = annotation.split(',') if annotation else []
to_add = self.to_add.split(',') if self.to_add else []
to_remove = self.to_remove.split(',') if self.to_remove else []
final_list = [x for x in existing if x not in to_remove]
final_list.extend([x for x in to_add if x not in existing])
return final_list
@dataclasses.dataclass(frozen=True)
class MultiRegionDomainNameChange(NonTemplateConfigChanger):
"""Sets a multi-region domain name annotation on the service.
Attributes:
domain_name: The sandbox annotation value to set.
"""
domain_name: str
def Adjust(self, resource):
resource.annotations[k8s_object.GCLB_DOMAIN_NAME_ANNOTATION] = (
self.domain_name
)
return resource
@dataclasses.dataclass(frozen=True)
class SetClientNameAndVersionAnnotationChange(ConfigChanger):
"""Sets the client name and version annotations.
Attributes:
client_name: Client name to set.
client_version: Client version to set.
set_on_template: A boolean indicating whether the client name and version
annotations should be set on the resource template as well.
"""
client_name: str
client_version: str
set_on_template: bool = True
@property
def adjusts_template(self):
return self.set_on_template
def Adjust(self, resource):
if self.client_name is not None:
resource.annotations[k8s_object.CLIENT_NAME_ANNOTATION] = self.client_name
if self.set_on_template and hasattr(resource.template, 'annotations'):
resource.template.annotations[k8s_object.CLIENT_NAME_ANNOTATION] = (
self.client_name
)
if self.client_version is not None:
resource.annotations[k8s_object.CLIENT_VERSION_ANNOTATION] = (
self.client_version
)
if self.set_on_template and hasattr(resource.template, 'annotations'):
resource.template.annotations[k8s_object.CLIENT_VERSION_ANNOTATION] = (
self.client_version
)
return resource
@dataclasses.dataclass(frozen=True)
class SandboxChange(TemplateConfigChanger):
"""Sets a sandbox annotation on the service.
Attributes:
sandbox: The sandbox annotation value to set.
"""
sandbox: str
def Adjust(self, resource):
resource.template.annotations[container_resource.SANDBOX_ANNOTATION] = (
self.sandbox
)
return resource
@dataclasses.dataclass(frozen=True)
class VpcConnectorChange(TemplateConfigChanger):
"""Sets a VPC connector annotation on the service.
Attributes:
connector_name: The VPC connector name to set in the annotation.
"""
connector_name: str
def Adjust(self, resource):
resource.template.annotations[container_resource.VPC_ACCESS_ANNOTATION] = (
self.connector_name
)
return resource
class ClearVpcConnectorChange(TemplateConfigChanger):
"""Clears a VPC connector annotation on the service."""
def Adjust(self, resource):
annotations = resource.template.annotations
if container_resource.VPC_ACCESS_ANNOTATION in annotations:
del annotations[container_resource.VPC_ACCESS_ANNOTATION]
if container_resource.EGRESS_SETTINGS_ANNOTATION in annotations:
del annotations[container_resource.EGRESS_SETTINGS_ANNOTATION]
return resource
@dataclasses.dataclass(init=False, frozen=True)
class ImageChange(ContainerConfigChanger):
"""A Cloud Run container deployment.
Attributes:
image: The image to set in the adjusted container.
"""
image: str
def __init__(self, image, **kwargs):
super().__init__(**kwargs)
object.__setattr__(self, 'image', image)
def AdjustContainer(self, container, messages_mod):
container.image = self.image
def _PruneMapping(
mapping: MutableMapping[str, str],
keys_to_remove: Collection[str],
clear_others: bool,
):
if clear_others:
mapping.clear()
else:
for var_or_path in keys_to_remove:
if var_or_path in mapping:
del mapping[var_or_path]
def _PruneManagedVolumeMapping(
resource,
res_volumes,
volume_mounts: MutableMapping[str, str],
removes: Collection[str],
clear_others: bool,
external_mounts: Container[str],
):
"""Remove the specified volume mappings from the config."""
if clear_others:
volume_mounts.clear()
else:
for remove in removes:
mount, path = remove.rsplit('/', 1)
if mount in volume_mounts:
volume_name = volume_mounts[mount]
if volume_name in external_mounts:
volume_name = _CopyToNewVolume(
resource,
volume_name,
mount,
copy.deepcopy(res_volumes[volume_name]),
res_volumes,
volume_mounts,
)
new_paths = []
for key_to_path in res_volumes[volume_name].items:
if path != key_to_path.path:
new_paths.append(key_to_path)
if not new_paths:
# there are no more versions in the volume
del volume_mounts[mount]
else:
res_volumes[volume_name].items = new_paths
def _CopyToNewVolume(
resource,
volume_name,
mount_point,
volume_source,
res_volumes,
volume_mounts,
):
"""Copies existing volume to volume with a new name."""
new_volume_name = _UniqueVolumeName(
volume_source.secretName, resource.template.volumes
)
try:
volume_mounts[mount_point] = new_volume_name
except KeyError:
raise exceptions.ConfigurationError(
'Cannot update mount [{}] because its mounted volume '
'is of a different source type.'.format(mount_point)
)
# the volume does not exist so we need a new one
new_paths = {item.path for item in volume_source.items}
old_volume = res_volumes[volume_name]
for item in old_volume.items:
if item.path not in new_paths:
volume_source.items.append(item)
res_volumes[new_volume_name] = volume_source
return new_volume_name
@dataclasses.dataclass(frozen=True)
class EnvVarLiteralChanges(ContainerConfigChanger):
"""Represents the user intent to modify environment variables string literals.
Attributes:
updates: Updated env var names and values to set.
removes: Env vars to remove.
clear_others: If true clear all non-updated env vars.
"""
updates: Mapping[str, str] = dataclasses.field(default_factory=dict)
removes: Collection[str] = dataclasses.field(default_factory=list)
clear_others: bool = False
def AdjustContainer(self, container, messages_mod):
"""Mutates the given config's env vars to match the desired changes.
Args:
container: container to adjust
messages_mod: messages module
Returns:
The adjusted container
Raises:
ConfigurationError if there's an attempt to replace the source of an
existing environment variable whose source is of a different type
(e.g. env var's secret source can't be replaced with a config map
source).
"""
_PruneMapping(container.env_vars.literals, self.removes, self.clear_others)
try:
container.env_vars.literals.update(self.updates)
except KeyError as e:
raise exceptions.ConfigurationError(
'Cannot update environment variable [{}] to string literal '
'because it has already been set with a different type.'.format(
e.args[0]
)
)
@dataclasses.dataclass(frozen=True)
class SecretEnvVarChanges(TemplateConfigChanger):
"""Represents the user intent to modify environment variable secrets.
Attributes:
updates: Env var names and values to update.
removes: List of env vars to remove.
clear_others: If true clear all non-updated env vars.
container_name: Name of the container to update. If None, the resource's
primary container is update.
"""
updates: Mapping[str, secrets_mapping.ReachableSecret]
removes: Collection[str]
clear_others: bool
container_name: str | None = None
def Adjust(self, resource):
"""Mutates the given config's env vars to match the desired changes.
Args:
resource: k8s_object to adjust
Returns:
The adjusted resource
Raises:
ConfigurationError if there's an attempt to replace the source of an
existing environment variable whose source is of a different type
(e.g. env var's secret source can't be replaced with a config map
source).
"""
if self.container_name:
env_vars = resource.template.containers[
self.container_name
].env_vars.secrets
else:
env_vars = resource.template.env_vars.secrets
_PruneMapping(env_vars, self.removes, self.clear_others)
for name, reachable_secret in self.updates.items():
try:
env_vars[name] = reachable_secret.AsEnvVarSource(resource)
except KeyError:
raise exceptions.ConfigurationError(
'Cannot update environment variable [{}] to the given type '
'because it has already been set with a different type.'.format(
name
)
)
secrets_mapping.PruneAnnotation(resource)
return resource
class ConfigMapEnvVarChanges(TemplateConfigChanger):
"""Represents the user intent to modify environment variable config maps."""
def __init__(self, updates, removes, clear_others):
"""Initialize a new ConfigMapEnvVarChanges object.
Args:
updates: {str, str}, Update env var names and values.
removes: [str], List of env vars to remove.
clear_others: bool, If true, clear all non-updated env vars.
Raises:
ConfigurationError if a key hasn't been provided for a source.
"""
super().__init__()
self._updates = {}
for name, v in updates.items():
# Split the given values into 2 parts:
# [env var source name, source data item key]
value = v.split(':', 1)
if len(value) < 2:
value.append(self._OmittedSecretKeyDefault(name))
self._updates[name] = value
self._removes = removes
self._clear_others = clear_others
def _OmittedSecretKeyDefault(self, name):
if platforms.IsManaged():
return 'latest'
raise exceptions.ConfigurationError(
'Missing required item key for environment variable [{}].'.format(name)
)
def Adjust(self, resource):
"""Mutates the given config's env vars to match the desired changes.
Args:
resource: k8s_object to adjust
Returns:
The adjusted resource
Raises:
ConfigurationError if there's an attempt to replace the source of an
existing environment variable whose source is of a different type
(e.g. env var's secret source can't be replaced with a config map
source).
"""
env_vars = resource.template.env_vars.config_maps
_PruneMapping(env_vars, self._removes, self._clear_others)
for name, (source_name, source_key) in self._updates.items():
try:
env_vars[name] = self._MakeEnvVarSource(
resource.MessagesModule(), source_name, source_key
)
except KeyError:
raise exceptions.ConfigurationError(
'Cannot update environment variable [{}] to the given type '
'because it has already been set with a different type.'.format(
name
)
)
return resource
def _MakeEnvVarSource(self, messages, name, key):
return messages.EnvVarSource(
configMapKeyRef=messages.ConfigMapKeySelector(name=name, key=key)
)
@dataclasses.dataclass(frozen=True)
class ResourceChanges(ContainerConfigChanger):
"""Represents the user intent to update resource limits.
Attributes:
memory: Updated memory limit to set in the container. Specified as string
ending in 'Mi' or 'Gi'. If None the memory limit is not changed.
cpu: Updated cpu limit to set in the container if not None.
gpu: Updated gpu limit to set in the container if not None.
"""
memory: str | None = None
cpu: str | None = None
gpu: str | None = None
def AdjustContainer(self, container, messages_mod):
"""Mutates the given config's resource limits to match what's desired."""
if self.memory is not None:
container.resource_limits['memory'] = self.memory
if self.cpu is not None:
container.resource_limits['cpu'] = self.cpu
if self.gpu is not None:
if self.gpu == '0':
container.resource_limits.pop('nvidia.com/gpu', None)
else:
container.resource_limits['nvidia.com/gpu'] = self.gpu
def _MakeProbe(
messages: types.ModuleType, settings: dict[str, str]
) -> run_v1_messages.Probe:
"""Creates a probe from the given settings.
Args:
messages: Run v1 messages module.
settings: a dict of settings for the probe.
Returns:
A new Run v1 probe.
"""
def _ParseInt(settings, key):
if not settings[key]:
return None
try:
return int(settings[key])
except ValueError:
raise exceptions.ArgumentError(
'Value for key [{}] must be an integer.'.format(key)
)
probe = messages.Probe()
for key in settings:
if key.startswith('tcpSocket'):
probe.tcpSocket = messages.TCPSocketAction()
elif key.startswith('httpGet'):
probe.httpGet = messages.HTTPGetAction()
elif key.startswith('grpc'):
probe.grpc = messages.GRPCAction()
else:
# Set the basic fields directly.
setattr(probe, key, _ParseInt(settings, key))
# TCP
if 'tcpSocket.port' in settings:
probe.tcpSocket.port = _ParseInt(settings, 'tcpSocket.port')
# HTTP
if 'httpGet.port' in settings:
probe.httpGet.port = _ParseInt(settings, 'httpGet.port')
if 'httpGet.path' in settings:
probe.httpGet.path = settings['httpGet.path']
# gRPC
if 'grpc.port' in settings:
probe.grpc.port = _ParseInt(settings, 'grpc.port')
if 'grpc.service' in settings:
probe.grpc.service = settings['grpc.service']
return probe
@dataclasses.dataclass(frozen=True)
class StartupProbeChanges(ContainerConfigChanger):
"""Represents the user intent to update startup probe settings.
Attributes:
settings: Values to set in the probe.
clear: If true, clear the startup probe.
"""
settings: dict[str, str] = dataclasses.field(default_factory=dict)
clear: bool = False
def AdjustContainer(self, container, messages_mod):
if self.clear:
container.startupProbe = None
return
container.startupProbe = _MakeProbe(messages_mod, self.settings)
@dataclasses.dataclass(frozen=True)
class LivenessProbeChanges(ContainerConfigChanger):
"""Represents the user intent to update liveness probe settings.
Attributes:
settings: values to set in the probe.
clear: If true, clear the liveness probe.
"""
settings: dict[str, str] = dataclasses.field(default_factory=dict)
clear: bool = False
def AdjustContainer(self, container, messages_mod):
if self.clear:
container.livenessProbe = None
return
container.livenessProbe = _MakeProbe(messages_mod, self.settings)
@dataclasses.dataclass(frozen=True)
class ReadinessProbeChanges(ContainerConfigChanger):
"""Represents the user intent to update readiness probe settings.
Attributes:
settings: values to set in the probe.
clear: If true, clear the readiness probe.
"""
settings: dict[str, str] = dataclasses.field(default_factory=dict)
clear: bool = False
def AdjustContainer(self, container, messages_mod):
if self.clear:
container.readinessProbe = None
return
container.readinessProbe = _MakeProbe(messages_mod, self.settings)
@dataclasses.dataclass(frozen=True)
class CloudSQLChanges(TemplateConfigChanger):
"""Represents the intent to update the Cloug SQL instances.
Attributes:
project: Project to use as the default project for Cloud SQL instances.
region: Region to use as the default region for Cloud SQL instances
args: Args to the command.
"""
add_cloudsql_instances: list[str]
remove_cloudsql_instances: list[str]
set_cloudsql_instances: list[str]
clear_cloudsql_instances: bool | None = None
@classmethod
def FromArgs(
cls,
project: str | None = None,
region: str | None = None,
*,
args: argparse.Namespace,
):
"""Returns a CloudSQLChanges object from the given args.
Args:
project: Optional project. If absent project must be specified in each
Cloud SQL instance.
region: Optional region. If absent region must be specified in each Cloud
SQL instance.
args: Command line args to parse CloudSQL flags from.
"""
def AugmentArgs(arg_name):
val = getattr(args, arg_name, None)
if val is None:
return None
return [Augment(i) for i in val]
def Augment(instance_str):
instance = instance_str.split(':')
if len(instance) == 3:
return ':'.join(instance)
elif len(instance) == 1:
if not project:
raise exceptions.CloudSQLError(
'To specify a Cloud SQL instance by plain name, you must specify'
' a project.'
)
if not region:
raise exceptions.CloudSQLError(
'To specify a Cloud SQL instance by plain name, you must be '
'deploying to a managed Cloud Run region.'
)
return ':'.join(itertools.chain([project, region], instance))
else:
raise exceptions.CloudSQLError(
'Malformed CloudSQL instance string: {}'.format(instance_str)
)
# Augment args so each cloudsql instance gets the region and project.
return cls(
add_cloudsql_instances=AugmentArgs('add_cloudsql_instances'),
remove_cloudsql_instances=AugmentArgs('remove_cloudsql_instances'),
set_cloudsql_instances=AugmentArgs('set_cloudsql_instances'),
clear_cloudsql_instances=getattr(
args, 'clear_cloudsql_instances', None
),
)
def Adjust(self, resource):
def GetCurrentInstances():
annotation_val = resource.template.annotations.get(
container_resource.CLOUDSQL_ANNOTATION
)
if annotation_val:
return annotation_val.split(',')
return []
instances = repeated.ParsePrimitiveArgs(
self, 'cloudsql-instances', GetCurrentInstances
)
if instances is not None:
resource.template.annotations[container_resource.CLOUDSQL_ANNOTATION] = (
','.join(instances)
)
return resource
@dataclasses.dataclass(frozen=True)
class ConcurrencyChanges(TemplateConfigChanger):
"""Represents the user intent to update concurrency preference.
Attributes:
concurrency: The concurrency value to set in the resource template. If None
concurrency is cleared.
"""
concurrency: int | None = None
@classmethod
def FromFlag(cls, concurrency):
"""Returns a ConcurrencyChanges object from the --concurrency flag value.
Args:
concurrency: The concurrency flag value. If 'default' concurrency is
cleared, otherwise should be an integer concurrency value to set.
"""
return cls(None if concurrency == 'default' else int(concurrency))
def Adjust(self, resource):
"""Mutates the given config's resource limits to match what's desired."""
resource.template.concurrency = self.concurrency
return resource
@dataclasses.dataclass(frozen=True)
class TimeoutChanges(TemplateConfigChanger):
"""Represents the user intent to update request duration.
Attributes:
timeout: The timeout to set in the resource template.
"""
timeout: str
def Adjust(self, resource):
"""Mutates the given config's timeout to match what's desired."""
resource.template.timeout = self.timeout
return resource
@dataclasses.dataclass(frozen=True)
class ServiceAccountChanges(TemplateConfigChanger):
"""Represents the user intent to change service account for the revision.
Attributes:
service_account: The service account to set.
"""
service_account: str
def Adjust(self, resource):
"""Mutates the given config's service account to match what's desired."""
resource.template.service_account = self.service_account
return resource
_MAX_RESOURCE_NAME_LENGTH = 63
@dataclasses.dataclass(frozen=True)
class RevisionNameChanges(TemplateConfigChanger):
"""Represents the user intent to change revision name.
Attributes:
revision_suffix: Suffix to append to the revision name.
"""
revision_suffix: str
def Adjust(self, resource):
"""Mutates the given config's revision name to match what's desired."""
if not self.revision_suffix:
resource.template.name = ''
return resource
max_prefix_length = (
_MAX_RESOURCE_NAME_LENGTH - len(self.revision_suffix) - 1
)
resource.template.name = '{}-{}'.format(
resource.name[:max_prefix_length], self.revision_suffix
)
return resource
def GenerateVolumeName(prefix):
"""Randomly generated name with the given prefix."""
return name_generator.GenerateName(sections=2, separator='-', prefix=prefix)
def _UniqueVolumeName(source_name, existing_volumes):
"""Generate unique volume name.
The names that connect volumes and mounts must be unique even if their
source volume names match.
Args:
source_name: (Potentially clashing) name.
existing_volumes: Names in use.
Returns:
Unique name.
"""
volume_name = None
while volume_name is None or volume_name in existing_volumes:
volume_name = GenerateVolumeName(source_name)
return volume_name
def _PruneVolumes(mounted_volumes, res_volumes):
"""Delete all volumes no longer being mounted.
Args:
mounted_volumes: set of volumes mounted in any container
res_volumes: resource.template.volumes
"""
for volume in list(res_volumes):
if volume not in mounted_volumes:
del res_volumes[volume]
@dataclasses.dataclass(frozen=True)
class SecretVolumeChanges(TemplateConfigChanger):
"""Represents the user intent to change volumes with secret source types.
Attributes:
updates: Updates to mount path and volume fields.
removes: List of mount paths to remove.
clear_others: If true clear all non-updated volumes and mounts of the given
[volume_type].
container_name: Name of the container to update.
"""
updates: Mapping[str, secrets_mapping.ReachableSecret]
removes: Collection[str]
clear_others: bool
container_name: str | None = None
def _UpdateManagedVolumes(
self, resource, volume_mounts, res_volumes, external_mounts
):
"""Update volumes for Cloud Run. Ensure only one secret per directory."""
new_volumes = {}
volumes_to_mounts = collections.defaultdict(list)
for path, vol in volume_mounts.items():
volumes_to_mounts[vol].append(path)
for file_path, reachable_secret in self.updates.items():
mount_point = file_path.rsplit('/', 1)[0]
if mount_point in new_volumes:
if new_volumes[mount_point].secretName != reachable_secret.secret_name:
# we don't support subpaths in managed so if there's a second
# secret in the same directory, error.
raise exceptions.ConfigurationError(
'Cannot update secret at [{}] because a different secret is '
'already mounted in the same directory.'.format(file_path)
)
reachable_secret.AppendToSecretVolumeSource(
resource, new_volumes[mount_point]
)
else:
new_volumes[mount_point] = reachable_secret.AsSecretVolumeSource(
resource
)
for mount_point, volume_source in new_volumes.items():
if mount_point in volume_mounts:
volume_name = volume_mounts[mount_point]
if (
len(volumes_to_mounts[volume_name]) > 1
or volume_name in external_mounts
):
# the volume is used by more than one path, let's separate it into a
# separate volume
volumes_to_mounts[volume_name].remove(mount_point)
new_name = _CopyToNewVolume(
resource,
volume_name,
mount_point,
volume_source,
res_volumes,
volume_mounts,
)
volumes_to_mounts[new_name].append(mount_point)
continue
else:
volume = res_volumes[volume_name]
if volume.secretName != volume_source.secretName:
# only allow replacing the secret if all versions are replaced
existing_paths = {item.path for item in volume.items}
new_paths = {item.path for item in volume_source.items}
if not existing_paths.issubset(new_paths):
raise exceptions.ConfigurationError(
'Multiple secrets are specified for directory [{}]. Cloud Run'
' only supports one secret per directory'.format(mount_point)
)
else:
# we need to merge the two
new_paths = {item.path for item in volume_source.items}
for item in volume.items:
# copy over existing paths that are not overridden
if item.path not in new_paths:
volume_source.items.append(item)
else:
volume_name = _UniqueVolumeName(
volume_source.secretName, resource.template.volumes
)
try:
volume_mounts[mount_point] = volume_name
except KeyError:
raise exceptions.ConfigurationError(
'Cannot update mount [{}] because its mounted volume '
'is of a different source type.'.format(mount_point)
)
# the volume does not exist so we need a new one
res_volumes[volume_name] = volume_source
def Adjust(self, resource):
"""Mutates the given config's volumes to match the desired changes.
Args:
resource: k8s_object to adjust
Returns:
The adjusted resource
Raises:
ConfigurationError if there's an attempt to replace the volume a mount
points to whose existing volume has a source of a different type than
the new volume (e.g. mount that points to a volume with a secret source
can't be replaced with a volume that has a config map source).
"""
if self.container_name:
container = resource.template.containers[self.container_name]
else:
container = resource.template.container
volume_mounts = container.volume_mounts.secrets
res_volumes = resource.template.volumes.secrets
external_mounts = frozenset(
itertools.chain.from_iterable(
external_container.volume_mounts.secrets.values()
for name, external_container in resource.template.containers.items()
if name != container.name
)
)
if platforms.IsManaged():
_PruneManagedVolumeMapping(
resource,
res_volumes,
volume_mounts,
self.removes,
self.clear_others,
external_mounts,
)
else:
removes = self.removes
_PruneMapping(volume_mounts, removes, self.clear_others)
if platforms.IsManaged():
self._UpdateManagedVolumes(
resource, volume_mounts, res_volumes, external_mounts
)
else:
for file_path, reachable_secret in self.updates.items():
volume_name = _UniqueVolumeName(
reachable_secret.secret_name, resource.template.volumes
)
# volume_mounts is a special mapping that filters for the current kind
# of mount and KeyErrors on existing keys with other types.
try:
mount_point = file_path
volume_mounts[mount_point] = volume_name
except KeyError:
raise exceptions.ConfigurationError(
'Cannot update mount [{}] because its mounted volume '
'is of a different source type.'.format(file_path)
)
res_volumes[volume_name] = reachable_secret.AsSecretVolumeSource(
resource
)
_PruneVolumes(external_mounts.union(volume_mounts.values()), res_volumes)
secrets_mapping.PruneAnnotation(resource)
return resource
class ConfigMapVolumeChanges(TemplateConfigChanger):
"""Represents the user intent to change volumes with config map source types."""
def __init__(self, updates, removes, clear_others):
"""Initialize a new ConfigMapVolumeChanges object.
Args:
updates: {str, [str, str]}, Update mount path and volume fields.
removes: [str], List of mount paths to remove.
clear_others: bool, If true, clear all non-updated volumes and mounts of
the given [volume_type].
"""
super().__init__()
self._updates = {}
for k, v in updates.items():
# Split the given values into 2 parts:
# [volume source name, data item key]
update_value = v.split(':', 1)
# Pad with None if no data item key specified
if len(update_value) < 2:
update_value.append(None)
self._updates[k] = update_value
self._removes = removes
self._clear_others = clear_others
def Adjust(self, resource):
"""Mutates the given config's volumes to match the desired changes.
Args:
resource: k8s_object to adjust
Returns:
The adjusted resource
Raises:
ConfigurationError if there's an attempt to replace the volume a mount
points to whose existing volume has a source of a different type than
the new volume (e.g. mount that points to a volume with a secret source
can't be replaced with a volume that has a config map source).
"""
volume_mounts = resource.template.container.volume_mounts.config_maps
res_volumes = resource.template.volumes.config_maps
_PruneMapping(volume_mounts, self._removes, self._clear_others)
for path, (source_name, source_key) in self._updates.items():
volume_name = _UniqueVolumeName(source_name, resource.template.volumes)
# volume_mounts is a special mapping that filters for the current kind
# of mount and KeyErrors on existing keys with other types.
try:
volume_mounts[path] = volume_name
except KeyError:
raise exceptions.ConfigurationError(
'Cannot update mount [{}] because its mounted volume '
'is of a different source type.'.format(path)
)
res_volumes[volume_name] = self._MakeVolumeSource(
resource.MessagesModule(), source_name, source_key
)
mounted_volumes = frozenset(
itertools.chain.from_iterable(
container.volume_mounts.config_maps.values()
for container in resource.template.containers.values()
)
)
_PruneVolumes(mounted_volumes, res_volumes)
return resource
def _MakeVolumeSource(self, messages, name, key=None):
source = messages.ConfigMapVolumeSource(name=name)
if key is not None:
source.items.append(messages.KeyToPath(key=key, path=key))
return source
class NoTrafficChange(NonTemplateConfigChanger):
"""Represents the user intent to block traffic for a new revision."""
def Adjust(self, resource):
"""Removes LATEST from the services traffic assignments."""
if not resource.generation:
raise exceptions.ConfigurationError(
'--no-traffic not supported when creating a new service.'
)
resource.spec_traffic.ZeroLatestTraffic(
resource.status.latestReadyRevisionName
)
return resource
@dataclasses.dataclass(frozen=True)
class TrafficChanges(NonTemplateConfigChanger):
"""Represents the user intent to change a service's traffic assignments.
Attributes:
new_percentages: New traffic percentages to set.
by_tag: Boolean indicating that new traffic percentages are specified by
tag.
tags_to_update: Traffic tag values to update.
tags_to_remove: Traffic tags to remove.
clear_other_tags: Whether nonupdated tags should be cleared.
"""
new_percentages: Mapping[str, int]
by_tag: bool = False
tags_to_update: Mapping[str, str] = dataclasses.field(default_factory=dict)
tags_to_remove: Container[str] = dataclasses.field(default_factory=list)
clear_other_tags: bool = False
def Adjust(self, resource):
"""Mutates the given service's traffic assignments."""
if self.tags_to_update or self.tags_to_remove or self.clear_other_tags:
resource.spec_traffic.UpdateTags(
self.tags_to_update,
self.tags_to_remove,
self.clear_other_tags,
)
if self.new_percentages:
if self.by_tag:
tag_to_key = resource.spec_traffic.TagToKey()
percentages = {}
for tag in self.new_percentages:
try:
percentages[tag_to_key[tag]] = self.new_percentages[tag]
except KeyError:
raise exceptions.ConfigurationError(
'There is no revision tagged with [{}] in the traffic'
' allocation for [{}].'.format(tag, resource.name)
)
else:
percentages = self.new_percentages
resource.spec_traffic.UpdateTraffic(percentages)
return resource
@dataclasses.dataclass(frozen=True)
class TagOnDeployChange(NonTemplateConfigChanger):
"""The intent to provide a tag for the revision you're currently deploying.
Attributes:
tag: The tag to apply to the new revision.
"""
tag: str
def Adjust(self, resource):
"""Gives the revision that's being created the given tag."""
tags_to_update = {self.tag: resource.template.name}
resource.spec_traffic.UpdateTags(tags_to_update, [], False)
return resource
@dataclasses.dataclass(init=False, frozen=True)
class ContainerCommandChange(ContainerConfigChanger):
"""Represents the user intent to change the 'command' for the container.
Attributes:
command: The command to set in the adjusted container.
"""
command: str
def __init__(self, command, **kwargs):
super().__init__(**kwargs)
object.__setattr__(self, 'command', command)
def AdjustContainer(self, container, messages_mod):
container.command = self.command
@dataclasses.dataclass(init=False, frozen=True)
class ContainerArgsChange(ContainerConfigChanger):
"""Represents the user intent to change the 'args' for the container.
Attributes:
args: The args to set in the adjusted container.
"""
args: list[str]
def __init__(self, args, **kwargs):
super().__init__(**kwargs)
object.__setattr__(self, 'args', args)
def AdjustContainer(self, container, messages_mod):
container.args = self.args
_HTTP2_NAME = 'h2c'
_DEFAULT_PORT = 8080
@dataclasses.dataclass(frozen=True)
class ContainerPortChange(ContainerConfigChanger):
"""Represents the user intent to change the port name and/or number.
Attributes:
port: The port to set, "default" to unset the containerPort field, or None
to not modify the port number.
use_http2: True to set the port name for http/2, False to unset it, or None
to not modify the port name.
**kwargs: ContainerConfigChanger args.
"""
port: str | None = None
use_http2: bool | None = None
def AdjustContainer(self, container, messages_mod):
"""Modify an existing ContainerPort or create a new one."""
port_msg = (
container.ports[0] if container.ports else messages_mod.ContainerPort()
)
old_port = port_msg.containerPort or 8080 # default port
# Set port to given value or clear field
if self.port == 'default':
port_msg.reset('containerPort')
elif self.port is not None:
port_msg.containerPort = int(self.port)
# Set name for http/2 or clear field
if self.use_http2:
port_msg.name = _HTTP2_NAME
elif self.use_http2 is not None:
port_msg.reset('name')
# A port number must be specified
if port_msg.name and not port_msg.containerPort:
port_msg.containerPort = _DEFAULT_PORT
# Use the ContainerPort iff it's not empty
if port_msg.containerPort:
container.ports = [port_msg]
else:
container.reset('ports')
# we also need to reset tcp startup probe port if it exists
if container.startupProbe and container.startupProbe.tcpSocket:
if container.startupProbe.tcpSocket.port == old_port:
if port_msg.containerPort:
container.startupProbe.tcpSocket.port = port_msg.containerPort
else:
container.startupProbe.tcpSocket.reset('port')
@dataclasses.dataclass(frozen=True)
class ExecutionTemplateSpecChange(TemplateConfigChanger):
"""Represents the intent to update field in an execution template's spec.
Attributes:
field: The field to update in the execution template spec.
value: The value to set in the updated field.
"""
field: str
value: Any
def Adjust(self, resource):
setattr(resource.execution_template.spec, self.field, self.value)
return resource
@dataclasses.dataclass(frozen=True)
class JobMaxRetriesChange(TemplateConfigChanger):
"""Represents the user intent to update a job's restart policy.
Attributes:
max_retries: The max retry number to set in the job's restart policy.
"""
max_retries: int
def Adjust(self, resource):
resource.task_template.spec.maxRetries = self.max_retries
return resource
@dataclasses.dataclass(frozen=True)
class JobTaskTimeoutChange(TemplateConfigChanger):
"""Represents the user intent to update a job's instance deadline.
Attributes:
timeout_seconds: The timeout in seconds to set in the job's instance
deadline.
"""
timeout_seconds: int
def Adjust(self, resource):
resource.task_template.spec.timeoutSeconds = self.timeout_seconds
return resource
@dataclasses.dataclass(frozen=True)
class CpuThrottlingChange(TemplateConfigChanger):
"""Sets the cpu-throttling annotation on the service template.
Attributes:
throttling: The throttling annotation value to set.
"""
throttling: bool
def Adjust(self, resource):
resource.template.annotations[
container_resource.CPU_THROTTLE_ANNOTATION
] = str(self.throttling)
return resource
@dataclasses.dataclass(frozen=True)
class StartupCpuBoostChange(TemplateConfigChanger):
"""Sets the startup-cpu-boost annotation on the service template.
Attributes:
cpu_boost: Boolean indicating whether CPU boost should be enabled.
"""
cpu_boost: bool
def Adjust(self, resource):
resource.template.annotations[
container_resource.COLD_START_BOOST_ANNOTATION
] = str(self.cpu_boost)
return resource
@dataclasses.dataclass(frozen=True)
class HealthCheckChange(TemplateConfigChanger):
"""Sets the health-check-disabled annotation on the revision template.
Attributes:
health_check: Boolean indicating whether the health check should be enabled.
"""
health_check: bool
def Adjust(self, resource):
resource.template.annotations[
container_resource.DISABLE_HEALTH_CHECK_ANNOTATION
] = str(not self.health_check)
return resource
@dataclasses.dataclass(frozen=True)
class DefaultUrlChange(NonTemplateConfigChanger):
"""Sets the default-url-disabled annotation on the service.
Attributes:
default_url: Boolean indicating whether the default URL should be enabled.
"""
default_url: bool
def Adjust(self, resource):
resource.annotations[container_resource.DISABLE_URL_ANNOTATION] = str(
not self.default_url
)
return resource
@dataclasses.dataclass(frozen=True)
class InvokerIamChange(NonTemplateConfigChanger):
"""Sets the invoker-iam-disabled annotation on the service.
Attributes:
invoker_iam_check: Boolean indicating whether invoker iam should be enabled
"""
invoker_iam_check: bool
def Adjust(self, resource):
resource.annotations[container_resource.DISABLE_IAM_ANNOTATION] = str(
not self.invoker_iam_check
)
return resource
@dataclasses.dataclass(frozen=True)
class NetworkInterfacesChange(TemplateConfigChanger):
"""Sets or updates the network interfaces annotation on the template.
Attributes:
network_is_set: Boolean indicating whether network was explicitly set by the
user.
network: The network to set.
subnet_is_set: Boolean indicating whether subnet was explicitly set by the
user.
subnet: The subnet to set.
network_tags_is_set: Boolean indicating whether network_tags was explicitly
set by the user.
network_tags: The network tags to set.
"""
network_is_set: bool
network: str
subnet_is_set: bool
subnet: str
network_tags_is_set: bool
network_tags: list[str]
def _SetOrClear(self, m, key, value):
if value:
# If value is present, add key=value to m.
m[key] = value
elif key in m:
# Otherwise clear the key from m.
del m[key]
def Adjust(self, resource):
annotations = resource.template.annotations
network_interface = {}
if k8s_object.NETWORK_INTERFACES_ANNOTATION in annotations:
network_interface = json.loads(
annotations[k8s_object.NETWORK_INTERFACES_ANNOTATION]
)[0]
if self.network_is_set:
self._SetOrClear(network_interface, 'network', self.network)
if self.subnet_is_set:
self._SetOrClear(network_interface, 'subnetwork', self.subnet)
if self.network_tags_is_set:
self._SetOrClear(network_interface, 'tags', self.network_tags)
value = ''
if network_interface:
value = '[{interfaces}]'.format(
interfaces=json.dumps(network_interface, sort_keys=True)
)
self._SetOrClear(
annotations, k8s_object.NETWORK_INTERFACES_ANNOTATION, value
)
# If clear network interfaces, egress setting should be cleared too.
if (
not value
and container_resource.EGRESS_SETTINGS_ANNOTATION in annotations
):
del annotations[container_resource.EGRESS_SETTINGS_ANNOTATION]
return resource
@dataclasses.dataclass(frozen=True)
class ClearNetworkInterfacesChange(TemplateConfigChanger):
"""Clears a network interfaces annotation on the resource."""
def Adjust(self, resource):
annotations = resource.template.annotations
if k8s_object.NETWORK_INTERFACES_ANNOTATION in annotations:
del annotations[k8s_object.NETWORK_INTERFACES_ANNOTATION]
if container_resource.EGRESS_SETTINGS_ANNOTATION in annotations:
del annotations[container_resource.EGRESS_SETTINGS_ANNOTATION]
return resource
@dataclasses.dataclass(frozen=True)
class CustomAudiencesChanges(TemplateConfigChanger):
"""Represents the intent to update the custom audiences.
Attributes:
args: Args to the command.
"""
args: object
@property
def add_custom_audiences(self):
return getattr(self.args, 'add_custom_audiences', None)
@property
def remove_custom_audiences(self):
return getattr(self.args, 'remove_custom_audiences', None)
@property
def set_custom_audiences(self):
return getattr(self.args, 'set_custom_audiences', None)
@property
def clear_custom_audiences(self):
return getattr(self.args, 'clear_custom_audiences', None)
def Adjust(self, resource):
def GetCurrentCustomAudiences():
annotation_val = resource.annotations.get(
k8s_object.CUSTOM_AUDIENCES_ANNOTATION
)
if annotation_val:
return json.loads(annotation_val)
return []
audiences = repeated.ParsePrimitiveArgs(
self, 'custom-audiences', GetCurrentCustomAudiences
)
if audiences is not None:
if audiences:
resource.annotations[k8s_object.CUSTOM_AUDIENCES_ANNOTATION] = (
json.dumps(audiences)
)
elif k8s_object.CUSTOM_AUDIENCES_ANNOTATION in resource.annotations:
del resource.annotations[k8s_object.CUSTOM_AUDIENCES_ANNOTATION]
return resource
@dataclasses.dataclass(frozen=True)
class RuntimeChange(TemplateConfigChanger):
"""Sets the runtime annotation on the service template.
Attributes:
runtime: The runtime annotation value to set.
"""
runtime: str
def Adjust(self, resource):
resource.template.spec.runtimeClassName = self.runtime
return resource
@dataclasses.dataclass(frozen=True)
class GpuTypeChange(TemplateConfigChanger):
"""Sets the gpu-type annotation on the service template.
Attributes:
gpu_type: The gpu_type annotation value to set.
"""
gpu_type: str
def Adjust(self, resource):
if self.gpu_type:
resource.template.node_selector[k8s_object.GPU_TYPE_NODE_SELECTOR] = (
self.gpu_type
)
else:
resource.template.node_selector.pop(
k8s_object.GPU_TYPE_NODE_SELECTOR, None
)
return resource
@dataclasses.dataclass(frozen=True)
class GpuZonalRedundancyChange(TemplateConfigChanger):
"""Sets the gpu zonal redundancy annotation on the revision annotations.
Attributes:
gpu_zonal_redundancy: The gpu_zonal_redundancy annotation value to set.
"""
gpu_zonal_redundancy: bool
def Adjust(self, resource):
resource.template.annotations[
revision.GPU_ZONAL_REDUNDANCY_DISABLED_ANNOTATION
] = str(not self.gpu_zonal_redundancy)
return resource
@dataclasses.dataclass(frozen=True)
class RemoveContainersChange(TemplateConfigChanger):
"""Removes the specified containers.
Attributes:
containers: Containers to remove.
"""
containers: frozenset[str]
@classmethod
def FromContainerNames(cls, containers: Iterable[str]):
"""Returns a RemoveContainersChange that removes the specified containers.
Args:
containers: The names of containers to remove. Duplicate container names
are ignored.
"""
return cls(frozenset(containers))
def Adjust(
self, resource: k8s_object.KubernetesObject
) -> k8s_object.KubernetesObject:
for container in self.containers:
try:
del resource.template.containers[container]
except KeyError:
continue
return resource
@dataclasses.dataclass(frozen=True)
class ContainerDependenciesChange(TemplateConfigChanger):
"""Sets container dependencies.
Updates container dependencies to add the dependencies in new_depencies.
Additionally, dependencies to or from a container which does not exist will be
removed.
Attributes:
new_dependencies: A map of containers to their updated dependencies.
Defaults to an empty map.
"""
new_dependencies: Mapping[str, Iterable[str]] = dataclasses.field(
default_factory=dict
)
def Adjust(
self, resource: k8s_object.KubernetesObject
) -> k8s_object.KubernetesObject:
containers = frozenset(resource.template.containers.keys())
dependencies = resource.template.dependencies
# Filter removed containers from existing container dependencies.
dependencies = {
container_name: [c for c in depends_on if c in containers]
for container_name, depends_on in dependencies.items()
if container_name in containers
}
for container, depends_on in self.new_dependencies.items():
if not container:
container = resource.template.container.name
depends_on = frozenset(depends_on)
missing = depends_on - containers
if missing:
raise exceptions.ConfigurationError(
'--depends_on for container {} references nonexistent'
' containers: {}.'.format(container, ','.join(missing))
)
if depends_on:
dependencies[container] = sorted(depends_on)
else:
del dependencies[container]
resource.template.dependencies = dependencies
return resource
@dataclasses.dataclass(frozen=True)
class RemoveVolumeChange(TemplateConfigChanger):
"""Removes volumes from the service or job template.
Attributes:
removed_volumes: The volumes to remove.
"""
removed_volumes: Iterable[str]
clear_volumes: bool
def Adjust(self, resource):
# having remove and clear is redundant, but we'll allow it.
if self.clear_volumes:
vols = list(resource.template.volumes)
for vol in vols:
del resource.template.volumes[vol]
else:
for to_remove in self.removed_volumes:
if to_remove in resource.template.volumes:
del resource.template.volumes[to_remove]
return resource
@dataclasses.dataclass(frozen=True)
class AddVolumeChange(TemplateConfigChanger):
"""Updates Volumes set on the service or job template.
Attributes:
new_volumes: The volumes to add.
release_track: The resource's release track. Used to verify volume types are
supported in that release track.
"""
new_volumes: Collection[Mapping[str, str]]
release_track: base.ReleaseTrack
def Adjust(self, resource):
for to_add in self.new_volumes:
volumes.add_volume(
to_add,
resource.template.volumes,
resource.MessagesModule(),
self.release_track,
)
return resource
@dataclasses.dataclass(frozen=True)
class RemoveVolumeMountChange(ContainerConfigChanger):
"""Removes Volume Mounts from the container.
Attributes:
removed_mounts: Volume mounts to remove from the adjusted container.
"""
removed_mounts: Collection[str] = dataclasses.field(default_factory=list)
clear_mounts: bool = False
def AdjustContainer(self, container, messages_mod):
if self.clear_mounts:
# iterating over the dictionary wrapper directly while deleting from it
# casues problems.
keys = list(container.volume_mounts)
for mount in keys:
del container.volume_mounts[mount]
else:
for to_remove in self.removed_mounts:
if to_remove in container.volume_mounts:
del container.volume_mounts[to_remove]
return container
@dataclasses.dataclass(frozen=True)
class AddVolumeMountChange(ContainerConfigChanger):
"""Updates Volume Mounts set on the container.
Attributes:
new_mounts: Mounts to add to the adjusted container.
"""
new_mounts: Collection[Mapping[str, str]] = dataclasses.field(
default_factory=list
)
def AdjustContainer(self, container, messages_mod):
for mount in self.new_mounts:
if 'volume' not in mount or 'mount-path' not in mount:
raise exceptions.ConfigurationError(
'Added Volume mounts must have a `volume` and a `mount-path`.'
)
container.volume_mounts[mount['mount-path']] = mount['volume']
return container
@dataclasses.dataclass(frozen=True)
class SetServiceMeshChange(TemplateConfigChanger):
"""Sets the service mesh annotation on the service template.
Attributes:
project: The project to use for the mesh when not specified in mesh_name.
mesh: Mesh resource name in the format of MESH_NAME or
projects/PROJECT/locations/global/meshes/MESH_NAME.
"""
project: str
mesh_name: str
def Adjust(self, resource):
resource.template.annotations[revision.MESH_ANNOTATION] = (
self.mesh_name
if '/' in self.mesh_name
else f'projects/{self.project}/locations/global/meshes/{self.mesh_name}'
)
return resource
@dataclasses.dataclass(frozen=True)
class PresetChange(TemplateConfigChanger):
"""Sets the preset annotation on the service template.
Attributes:
type: The type of preset to use.
config: The config to use for the preset.
flatten: Whether to flatten the preset values into the service template.
"""
type: str
config: Mapping[str, str] = dataclasses.field(default_factory=dict)
flatten: bool = True
def Adjust(self, resource):
# TODO(b/412784660): Add support for multiple presets and merge existing
# presets.
presets = []
if service.PRESETS_ANNOTATION in resource.annotations:
presets = json.loads(resource.annotations[service.PRESETS_ANNOTATION])
if presets and presets[0]['type'] != self.type:
presets.append({
'type': self.type,
'config': self.config,
'flatten': self.flatten,
})
else:
presets = [{
'type': self.type,
'config': self.config,
'flatten': self.flatten,
}]
resource.annotations[service.PRESETS_ANNOTATION] = json.dumps(presets)
return resource
@dataclasses.dataclass(frozen=True)
class RemovePresetsChange(TemplateConfigChanger):
"""Removes one or more presets from annotation from the service metadata.
Attributes:
clear_presets: Whether to clear all presets.
"""
clear_presets: bool
def Adjust(self, resource):
presets = json.loads(
resource.annotations.get(service.PRESETS_ANNOTATION, '[]')
)
if presets and self.clear_presets:
del resource.annotations[service.PRESETS_ANNOTATION]
return resource