File: //snap/google-cloud-cli/394/lib/surface/secrets/update.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Update an existing secret."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
from googlecloudsdk.api_lib.secrets import api as secrets_api
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.command_lib.secrets import args as secrets_args
from googlecloudsdk.command_lib.secrets import log as secrets_log
from googlecloudsdk.command_lib.secrets import util as secrets_util
from googlecloudsdk.command_lib.util.args import labels_util
from googlecloudsdk.command_lib.util.args import map_util
from googlecloudsdk.core.console import console_io
@base.ReleaseTracks(base.ReleaseTrack.GA)
@base.DefaultUniverseOnly
class Update(base.UpdateCommand):
r"""Update a secret's metadata.
Update a secret's metadata (e.g. labels). This command will
return an error if given a secret that does not exist.
## EXAMPLES
Update the label of a secret named `my-secret`.
$ {command} my-secret --update-labels=foo=bar
Update the label of a secret using an etag.
$ {command} my-secret --update-labels=foo=bar --etag=123
Update a secret to have a next-rotation-time:
$ {command} my-secret --next-rotation-time="2030-01-01T15:30:00-05:00"
Update a secret to have a next-rotation-time and rotation-period:
$ {command} my-secret --next-rotation-time="2030-01-01T15:30:00-05:00"
--rotation-period="7200s"
Update a secret to remove the next-rotation-time:
$ {command} my-secret --remove-next-rotation-time
Update a secret to clear rotation policy:
$ {command} my-secret --remove-rotation-schedule
Update version destroy ttl of a secret:
$ {command} my-secret --version-destroy-ttl="86400s"
Disable delayed secret version destroy:
$ {command} my-secret --remove-version-destroy-ttl
"""
NO_CHANGES_MESSAGE = (
'There are no changes to the secret [{secret}] for update.')
SECRET_MISSING_MESSAGE = (
'The secret [{secret}] cannot be updated because it does not exist. '
'Please use the create command to create a new secret.')
CONFIRM_EXPIRE_TIME_MESSAGE = (
'This secret and all of its versions will be automatically deleted at '
'the requested expire-time of [{expire_time}].')
CONFIRM_TTL_MESSAGE = (
'This secret and all of its versions will be automatically deleted '
'after the requested ttl of [{ttl}] has elapsed.')
REGIONAL_KMS_FLAG_MESSAGE = (
'The --regional-kms-key-name or --remove-regional-kms-key-name flag can'
' only be used when update a regional secret with "--location".'
)
@staticmethod
def Args(parser):
"""Args is called by calliope to gather arguments for this command.
Args:
parser: An argparse parser that you can use to add arguments that will be
available to this command.
"""
secrets_args.AddSecret(
parser, purpose='to update', positional=True, required=True
)
secrets_args.AddLocation(parser, purpose='to update secret', hidden=False)
alias = parser.add_group(mutex=True, help='Version Aliases')
annotations = parser.add_group(mutex=True, help='Annotations')
labels_util.AddUpdateLabelsFlags(parser)
secrets_args.AddSecretEtag(parser, action='updated')
secrets_args.AddUpdateExpirationGroup(parser)
secrets_args.AddUpdateTopicsGroup(parser)
secrets_args.AddUpdateRotationGroup(parser)
secrets_args.AddUpdateVersionDestroyTTL(parser)
secrets_args.AddUpdateRegionalKmsKey(parser)
map_util.AddMapUpdateFlag(alias, 'version-aliases', 'Version Aliases', str,
int)
map_util.AddMapRemoveFlag(alias, 'version-aliases', 'Version Aliases', str)
map_util.AddMapClearFlag(alias, 'version-aliases', 'Version Aliases')
map_util.AddMapUpdateFlag(annotations, 'annotations', 'Annotations', str,
str)
map_util.AddMapRemoveFlag(annotations, 'annotations', 'Annotations', str)
map_util.AddMapClearFlag(annotations, 'annotations', 'Annotations')
def _RunUpdate(self, original, args):
api_version = secrets_api.GetApiFromTrack(self.ReleaseTrack())
messages = secrets_api.GetMessages(version=api_version)
secret_ref = args.CONCEPTS.secret.Parse()
# Collect the list of update masks
update_mask = []
labels_diff = labels_util.Diff.FromUpdateArgs(args)
if labels_diff.MayHaveUpdates():
update_mask.append('labels')
if args.IsSpecified('ttl'):
update_mask.append('ttl')
if args.IsSpecified('expire_time') or args.IsSpecified('remove_expiration'):
update_mask.append('expire_time')
if ((args.IsSpecified('next_rotation_time') or
args.IsSpecified('remove_next_rotation_time')) or
args.IsSpecified('remove_rotation_schedule')):
update_mask.append('rotation.next_rotation_time')
if ((args.IsSpecified('rotation_period') or
args.IsSpecified('remove_rotation_period')) or
args.IsSpecified('remove_rotation_schedule')):
update_mask.append('rotation.rotation_period')
if args.IsSpecified('add_topics') or args.IsSpecified(
'remove_topics') or args.IsSpecified('clear_topics'):
update_mask.append('topics')
if args.IsSpecified('update_version_aliases') or args.IsSpecified(
'remove_version_aliases') or args.IsSpecified('clear_version_aliases'):
update_mask.append('version_aliases')
if args.IsSpecified('update_annotations') or args.IsSpecified(
'remove_annotations') or args.IsSpecified('clear_annotations'):
update_mask.append('annotations')
if args.IsSpecified('version_destroy_ttl') or args.IsSpecified(
'remove_version_destroy_ttl'
):
update_mask.append('version_destroy_ttl')
if args.IsSpecified('regional_kms_key_name') or args.IsSpecified(
'remove_regional_kms_key_name'
):
update_mask.append('customer_managed_encryption')
# Validations
if not update_mask:
raise exceptions.MinimumArgumentException(
[
'--clear-labels',
'--remove-labels',
'--update-labels',
'--ttl',
'--expire-time',
'--remove-expiration',
'--clear-topics',
'--remove-topics',
'--add-topics',
'--update-version-aliases',
'--remove-version-aliases',
'--clear-version-aliases',
'--update-annotations',
'--remove-annotations',
'--clear-annotations',
'--next-rotation-time',
'--remove-next-rotation-time',
'--rotation-period',
'--remove-rotation-period',
'--remove-rotation-schedule',
'--version-destroy-ttl',
'--remove-version-destroy-ttl',
'--remove_regional_kms_key_name',
'--regional-kms-key-name',
],
self.NO_CHANGES_MESSAGE.format(secret=secret_ref.Name()),
)
labels_update = labels_diff.Apply(messages.Secret.LabelsValue,
original.labels)
labels = original.labels
if labels_update.needs_update:
labels = labels_update.labels
if args.expire_time:
msg = self.CONFIRM_EXPIRE_TIME_MESSAGE.format(
expire_time=args.expire_time)
console_io.PromptContinue(
msg, throw_if_unattended=True, cancel_on_no=True)
if args.ttl:
msg = self.CONFIRM_TTL_MESSAGE.format(ttl=args.ttl)
console_io.PromptContinue(
msg, throw_if_unattended=True, cancel_on_no=True)
if 'topics' in update_mask:
topics = secrets_util.ApplyTopicsUpdate(args, original.topics)
else:
topics = []
version_aliases = []
if 'version_aliases' in update_mask:
version_aliases = []
if original.versionAliases is None:
original.versionAliases = messages.Secret.VersionAliasesValue(
additionalProperties=[])
version_aliases_dict = secrets_util.ApplyAliasUpdate(
args, original.versionAliases.additionalProperties)
for alias, version in version_aliases_dict.items():
version_aliases.append(
messages.Secret.VersionAliasesValue.AdditionalProperty(
key=alias, value=version))
annotations = []
if 'annotations' in update_mask:
if original.annotations is None:
original.annotations = messages.Secret.AnnotationsValue(
additionalProperties=[])
annotations_dict = secrets_util.ApplyAnnotationsUpdate(
args, original.annotations.additionalProperties)
for annotation, metadata in annotations_dict.items():
annotations.append(
messages.Secret.AnnotationsValue.AdditionalProperty(
key=annotation, value=metadata))
if args.version_destroy_ttl:
version_destroy_ttl = f'{args.version_destroy_ttl}s'
else:
version_destroy_ttl = None
if not args.location and (
args.regional_kms_key_name or args.remove_regional_kms_key_name
):
raise exceptions.RequiredArgumentException(
'location', self.REGIONAL_KMS_FLAG_MESSAGE
)
secret = secrets_api.Secrets(api_version=api_version).Update(
secret_ref=secret_ref,
labels=labels,
version_aliases=version_aliases,
annotations=annotations,
update_mask=update_mask,
etag=args.etag,
expire_time=args.expire_time,
ttl=args.ttl,
topics=topics,
next_rotation_time=args.next_rotation_time,
rotation_period=args.rotation_period,
version_destroy_ttl=version_destroy_ttl,
regional_kms_key_name=args.regional_kms_key_name,
secret_location=args.location,
)
secrets_log.Secrets().Updated(secret_ref)
return secret
def Run(self, args):
"""Run is called by calliope to update the secret.
Args:
args: argparse.Namespace, The arguments that this command was invoked
with.
Returns:
The API call to service for secret update.
"""
api_version = secrets_api.GetApiFromTrack(self.ReleaseTrack())
secret_ref = args.CONCEPTS.secret.Parse()
secret = secrets_api.Secrets(api_version=api_version).GetOrNone(
secret_ref, secret_location=args.location
)
# Secret does not exist
if secret is None:
raise exceptions.InvalidArgumentException(
'secret', self.SECRET_MISSING_MESSAGE.format(secret=secret_ref.Name())
)
# The secret exists, update it
return self._RunUpdate(secret, args)
@base.ReleaseTracks(base.ReleaseTrack.BETA)
@base.DefaultUniverseOnly
class UpdateBeta(Update):
r"""Update a secret's metadata.
Update a secret's metadata (e.g. labels). This command will
return an error if given a secret that does not exist.
## EXAMPLES
Update the label of a secret named `my-secret`.
$ {command} my-secret --update-labels=foo=bar
Update the label of a secret using etag.
$ {command} my-secret --update-labels=foo=bar --etag=123
Update the expiration of a secret named 'my-secret' using a ttl.
$ {command} my-secret --ttl="600s"
Update the expiration of a secret named 'my-secret' using an expire-time.
$ {command} my-secret --expire-time="2030-01-01T08:15:30-05:00"
Remove the expiration of a secret named 'my-secret'.
$ {command} my-secret --remove-expiration
Update a secret to have a next-rotation-time:
$ {command} my-secret --next-rotation-time="2030-01-01T15:30:00-05:00"
Update a secret to have a next-rotation-time and rotation-period:
$ {command} my-secret --next-rotation-time="2030-01-01T15:30:00-05:00"
--rotation-period="7200s"
Update a secret to remove the next-rotation-time:
$ {command} my-secret --remove-next-rotation-time
Update a secret to clear rotation policy:
$ {command} my-secret --remove-rotation-schedule
Update version destroy ttl of a secret:
$ {command} my-secret --version-destroy-ttl="86400s"
Disable delayed secret version destroy:
$ {command} my-secret --remove-version-destroy-ttl
"""
NO_CHANGES_MESSAGE = (
'There are no changes to the secret [{secret}] for update'
)
@staticmethod
def Args(parser):
secrets_args.AddSecret(
parser, purpose='to update', positional=True, required=True
)
secrets_args.AddLocation(parser, purpose='to update secret', hidden=False)
alias = parser.add_group(mutex=True, help='Version Aliases')
annotations = parser.add_group(mutex=True, help='Annotations')
labels_util.AddUpdateLabelsFlags(parser)
secrets_args.AddSecretEtag(parser, action='updated')
secrets_args.AddUpdateExpirationGroup(parser)
secrets_args.AddUpdateRotationGroup(parser)
secrets_args.AddUpdateTopicsGroup(parser)
secrets_args.AddUpdateVersionDestroyTTL(parser)
secrets_args.AddUpdateRegionalKmsKey(parser)
map_util.AddMapUpdateFlag(alias, 'version-aliases', 'Version Aliases', str,
int)
map_util.AddMapRemoveFlag(alias, 'version-aliases', 'Version Aliases', str)
map_util.AddMapClearFlag(alias, 'version-aliases', 'Version Aliases')
map_util.AddMapUpdateFlag(annotations, 'annotations', 'Annotations', str,
str)
map_util.AddMapRemoveFlag(annotations, 'annotations', 'Annotations', str)
map_util.AddMapClearFlag(annotations, 'annotations', 'Annotations')
def _RunUpdate(self, original, args):
api_version = secrets_api.GetApiFromTrack(self.ReleaseTrack())
messages = secrets_api.GetMessages(version=api_version)
secret_ref = args.CONCEPTS.secret.Parse()
# Collect the list of update masks
update_mask = []
labels_diff = labels_util.Diff.FromUpdateArgs(args)
if labels_diff.MayHaveUpdates():
update_mask.append('labels')
if args.IsSpecified('ttl'):
update_mask.append('ttl')
if args.IsSpecified('expire_time') or args.IsSpecified('remove_expiration'):
update_mask.append('expire_time')
if args.IsSpecified('add_topics') or args.IsSpecified(
'remove_topics') or args.IsSpecified('clear_topics'):
update_mask.append('topics')
if ((args.IsSpecified('next_rotation_time') or
args.IsSpecified('remove_next_rotation_time')) or
args.IsSpecified('remove_rotation_schedule')):
update_mask.append('rotation.next_rotation_time')
if ((args.IsSpecified('rotation_period') or
args.IsSpecified('remove_rotation_period')) or
args.IsSpecified('remove_rotation_schedule')):
update_mask.append('rotation.rotation_period')
if args.IsSpecified('update_version_aliases') or args.IsSpecified(
'remove_version_aliases') or args.IsSpecified('clear_version_aliases'):
update_mask.append('version_aliases')
if args.IsSpecified('update_annotations') or args.IsSpecified(
'remove_annotations') or args.IsSpecified('clear_annotations'):
update_mask.append('annotations')
if args.IsSpecified('version_destroy_ttl') or args.IsSpecified(
'remove_version_destroy_ttl'
):
update_mask.append('version_destroy_ttl')
if args.IsSpecified('regional_kms_key_name') or args.IsSpecified(
'remove_regional_kms_key_name'
):
update_mask.append('customer_managed_encryption')
# Validations
if not update_mask:
raise exceptions.MinimumArgumentException(
[
'--clear-labels',
'--remove-labels',
'--update-labels',
'--ttl',
'--expire-time',
'--remove-expiration',
'--clear-topics',
'--remove-topics',
'--add-topics',
'--update-version-aliases',
'--remove-version-aliases',
'--clear-version-aliases',
'--update-annotations',
'--remove-annotations',
'--clear-annotations',
'--next-rotation-time',
'--remove-next-rotation-time',
'--rotation-period',
'--remove-rotation-period',
'--remove-rotation-schedule',
'--version-destroy-ttl',
'--remove-version-destroy-ttl',
'--remove_regional_kms_key_name',
'--regional-kms-key-name',
],
self.NO_CHANGES_MESSAGE.format(secret=secret_ref.Name()),
)
labels_update = labels_diff.Apply(messages.Secret.LabelsValue,
original.labels)
labels = original.labels
if labels_update.needs_update:
labels = labels_update.labels
if 'topics' in update_mask:
topics = secrets_util.ApplyTopicsUpdate(args, original.topics)
else:
topics = []
version_aliases = []
if 'version_aliases' in update_mask:
if original.versionAliases is None:
original.versionAliases = messages.Secret.VersionAliasesValue(
additionalProperties=[]
)
version_aliases_dict = secrets_util.ApplyAliasUpdate(
args, original.versionAliases.additionalProperties
)
for alias, version in version_aliases_dict.items():
version_aliases.append(
messages.Secret.VersionAliasesValue.AdditionalProperty(
key=alias, value=version
)
)
annotations = []
if 'annotations' in update_mask:
annotations = []
if original.annotations is None:
original.annotations = messages.Secret.AnnotationsValue(
additionalProperties=[]
)
annotations_dict = secrets_util.ApplyAnnotationsUpdate(
args, original.annotations.additionalProperties
)
for annotation, metadata in annotations_dict.items():
annotations.append(
messages.Secret.AnnotationsValue.AdditionalProperty(
key=annotation, value=metadata
)
)
if args.expire_time:
msg = self.CONFIRM_EXPIRE_TIME_MESSAGE.format(
expire_time=args.expire_time
)
console_io.PromptContinue(
msg, throw_if_unattended=True, cancel_on_no=True
)
if args.ttl:
msg = self.CONFIRM_TTL_MESSAGE.format(ttl=args.ttl)
console_io.PromptContinue(
msg, throw_if_unattended=True, cancel_on_no=True
)
if args.version_destroy_ttl:
version_destroy_ttl = f'{args.version_destroy_ttl}s'
else:
version_destroy_ttl = None
if not args.location and (
args.regional_kms_key_name or args.remove_regional_kms_key_name
):
raise exceptions.RequiredArgumentException(
'location', self.REGIONAL_KMS_FLAG_MESSAGE
)
secret = secrets_api.Secrets(api_version=api_version).Update(
secret_ref=secret_ref,
labels=labels,
update_mask=update_mask,
version_aliases=version_aliases,
annotations=annotations,
etag=args.etag,
expire_time=args.expire_time,
ttl=args.ttl,
topics=topics,
next_rotation_time=args.next_rotation_time,
rotation_period=args.rotation_period,
version_destroy_ttl=version_destroy_ttl,
regional_kms_key_name=args.regional_kms_key_name,
secret_location=args.location,
)
secrets_log.Secrets().Updated(secret_ref)
return secret
def Run(self, args):
api_version = secrets_api.GetApiFromTrack(self.ReleaseTrack())
secret_ref = args.CONCEPTS.secret.Parse()
secret = secrets_api.Secrets(api_version=api_version).GetOrNone(
secret_ref, secret_location=args.location
)
# Secret does not exist
if secret is None:
raise exceptions.InvalidArgumentException(
'secret', self.SECRET_MISSING_MESSAGE.format(secret=secret_ref.Name())
)
# The secret exists, update it
return self._RunUpdate(secret, args)