HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/workflows/flags.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Shared flags for Cloud Workflows commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from googlecloudsdk.api_lib.util import apis
from googlecloudsdk.calliope import arg_parsers
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.calliope.concepts import concepts
from googlecloudsdk.calliope.concepts import deps
from googlecloudsdk.command_lib.util.args import labels_util
from googlecloudsdk.command_lib.util.args import map_util
from googlecloudsdk.command_lib.util.concepts import concept_parsers
from googlecloudsdk.core import properties
from googlecloudsdk.core.util import files
import six

_KEY_NAME_PATTERN = (
    r'^projects/[^/]+/locations/[^/]+/keyRings/[a-zA-Z0-9_-]+'
    '/cryptoKeys/[a-zA-Z0-9_-]+$'
)
_KEY_NAME_ERROR = (
    'KMS key name should match projects/{project}/locations/{location}'
    '/keyRings/{keyring}/cryptoKeys/{cryptokey} and only contain characters '
    'from the valid character set for a KMS key.'
)
USER_ENV_VARS_LIMIT = 20
CLEAR_ENVIRONMENT = object()


def LocationAttributeConfig():
  """Builds an AttributeConfig for the location resource."""
  return concepts.ResourceParameterAttributeConfig(
      name='location',
      fallthroughs=[
          deps.PropertyFallthrough(properties.FromString('workflows/location'))
      ],
      help_text=(
          'Cloud location for the {resource}. '
          ' Alternatively, set the property [workflows/location].'
      ),
  )


def WorkflowAttributeConfig():
  """Builds an AttributeConfig for the workflow resource."""
  return concepts.ResourceParameterAttributeConfig(
      name='workflow', help_text='Workflow for the {resource}.'
  )


def ExecutionAttributeConfig():
  """Builds an AttributeConfig for the execution resource."""
  return concepts.ResourceParameterAttributeConfig(
      name='execution', help_text='Execution for the {resource}.'
  )


def GetWorkflowResourceSpec():
  """Builds a ResourceSpec for the workflow resource."""
  return concepts.ResourceSpec(
      'workflows.projects.locations.workflows',
      resource_name='workflow',
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      locationsId=LocationAttributeConfig(),
      workflowsId=WorkflowAttributeConfig(),
  )


def GetExecutionResourceSpec():
  """Builds a ResourceSpec for the execution resource."""
  return concepts.ResourceSpec(
      'workflowexecutions.projects.locations.workflows.executions',
      resource_name='execution',
      projectsId=concepts.DEFAULT_PROJECT_ATTRIBUTE_CONFIG,
      workflowsId=WorkflowAttributeConfig(),
      locationsId=LocationAttributeConfig(),
      executionsId=ExecutionAttributeConfig(),
  )


def AddWorkflowResourceArg(parser, verb):
  """Add a resource argument for a Cloud Workflows workflow.

  Args:
    parser: the parser for the command.
    verb: str, the verb to describe the resource, such as 'to update'.
  """
  concept_parsers.ConceptParser.ForResource(
      'workflow',
      GetWorkflowResourceSpec(),
      'Name of the workflow {}.'.format(verb),
      required=True,
  ).AddToParser(parser)


def AddExecutionResourceArg(parser, verb):
  """Add a resource argument for a Cloud Workflows execution.

  Args:
    parser: the parser for the command.
    verb: str, the verb to describe the resource, such as 'to update'.
  """
  concept_parsers.ConceptParser.ForResource(
      'execution',
      GetExecutionResourceSpec(),
      'Name of the execution {}.'.format(verb),
      required=True,
  ).AddToParser(parser)


def AddSourceArg(parser):
  """Adds argument for specifying source for the workflow."""
  parser.add_argument(
      '--source',
      help=(
          'Location of a workflow source code to deploy. Required on first '
          'deployment. Location needs to be defined as a path to a local file '
          'with the source code.'
      ),
  )


def AddDescriptionArg(parser):
  """Adds argument for specifying description of the workflow."""
  parser.add_argument(
      '--description', help='The description of the workflow to deploy.'
  )


def AddServiceAccountArg(parser):
  """Adds argument for specifying service account used by the workflow."""
  parser.add_argument(
      '--service-account',
      help=(
          'The service account that should be used as the workflow identity.'
          ' "projects/PROJECT_ID/serviceAccounts/" prefix may be skipped from'
          ' the full resource name, in that case "projects/-/serviceAccounts/"'
          ' is prepended to the service account ID.'
      ),
  )


def AddDataArg(parser):
  """Adds argument for specifying the data that will be passed to the workflow."""
  parser.add_argument(
      '--data',
      help=(
          'JSON string with data that will be passed to the workflow '
          'as an argument.'
      ),
  )


def AddLoggingArg(parser):
  """Adds argument for specifying the logging level for an execution."""
  log_level = base.ChoiceArgument(
      '--call-log-level',
      choices={
          'none': 'No logging level specified.',
          'log-all-calls': (
              'Log all calls to subworkflows or library functions and their'
              ' results.'
          ),
          'log-errors-only': 'Log when a call is stopped due to an exception.',
          'log-none': 'Perform no call logging.',
      },
      help_str='Level of call logging to apply during execution.',
      default='none',
  )
  log_level.AddToParser(parser)


def AddExecutionHistoryLevelArg(parser):
  """Adds argument for specifying the execution history level for an execution."""
  execution_history_level = base.ChoiceArgument(
      '--execution-history-level',
      choices={
          'none': 'No execution history level specified.',
          'execution-history-basic': (
              'Enable execution history basic feature.'
          ),
          'execution-history-detailed': (
              'Enable execution history detailed feature.'
          ),
      },
      help_str='Level of execution history to apply during execution.',
      default='none',
  )
  execution_history_level.AddToParser(parser)


def AddDisableOverflowBufferArg(parser):
  """Adds an argument for determining whether to backlog the execution."""
  parser.add_argument(
      '--disable-concurrency-quota-overflow-buffering',
      action='store_true',
      default=False,
      help=(
          'If set, the execution will not be backlogged when the concurrency '
          'quota is exhausted. Backlogged executions start when the '
          'concurrency quota becomes available.'
      ),
  )


def AddBetaLoggingArg(parser):
  """Adds argument for specifying the logging level for an execution."""
  log_level = base.ChoiceArgument(
      '--call-log-level',
      choices={
          'none': 'Perform no call logging.',
          'log-all-calls': (
              'Log all calls to subworkflows or library functions and their'
              ' results.'
          ),
          'log-errors-only': 'Log when a call is stopped due to an exception.',
      },
      help_str='Level of call logging to apply during execution.',
      default='none',
  )
  log_level.AddToParser(parser)


def AddWorkflowLoggingArg(parser):
  """Adds argument for specifying the logging level for a workflow."""
  log_level = base.ChoiceArgument(
      '--call-log-level',
      choices={
          'none': 'No logging level specified.',
          'log-all-calls': (
              'Log all calls to subworkflows or library functions and their'
              ' results.'
          ),
          'log-errors-only': 'Log when a call is stopped due to an exception.',
          'log-none': 'Perform no call logging.',
      },
      help_str='Level of call logging to apply by default for the workflow.',
      default='none',
  )
  log_level.AddToParser(parser)


def AddWorkflowTagsArg(parser):
  """Adds argument for specifying the tags for a workflow."""
  help_parts = [
      'List of tags KEY=VALUE pairs to bind.',
      'Each item must be expressed as',
      '"<tag-key-namespaced-name>=<tag-value-short-name>".\n',
      'Example: 123/environment=production,123/costCenter=marketing',
  ]
  tags = base.Argument(
      '--tags',
      metavar='KEY=VALUE',
      type=arg_parsers.ArgDict(),
      action=arg_parsers.UpdateAction,
      help='\n'.join(help_parts),
  )
  tags.AddToParser(parser)


def SetWorkflowsTagsArg(args, workflow, tags_message):
  """Sets --tags for the workflow based on the arguments."""
  if args.IsSpecified('tags'):
    tags = getattr(args, 'tags')
    if not tags:
      return None
    # Sorted for test stability
    workflow.tags = tags_message(additionalProperties=[
        tags_message.AdditionalProperty(key=key, value=value)
        for key, value in sorted(six.iteritems(tags))])


def AddWorkflowExecutionHistoryLevelArg(parser):
  """"Adds argument for specifying the execution history level for a workflow."""
  execution_history_level = base.ChoiceArgument(
      '--execution-history-level',
      choices={
          'none': 'No execution history level specified.',
          'execution-history-basic': (
              'Enable basic execution history.'
          ),
          'execution-history-detailed': (
              'Enable detailed execution history, including expected'
              ' iterations and in-scope variable values.'
          ),
      },
      help_str='Level of execution history to apply for the workflow.',
      default='none',
  )
  execution_history_level.AddToParser(parser)


def SetWorkflowLoggingArg(loglevel, workflow, updated_fields):
  """Sets --call-log-level for the workflow based on the arguments.

  Also updates updated_fields accordingly.

  Args:
    loglevel: Parsed callLogLevel to be set on the workflow.
    workflow: The workflow in which to set the call-log-level.
    updated_fields: A list to which the call-log-level field will be added if
      needed.
  """
  if loglevel is not None:
    workflow.callLogLevel = loglevel
    updated_fields.append('callLogLevel')


def SetWorkflowExecutionHistoryLevelArg(
    execution_history_level, workflow, updated_fields):
  """Sets --execution-history-level for the workflow based on the arguments.

  Also updates updated_fields accordingly.

  Args:
    execution_history_level: Parsed executionHistoryLevel to be set
    on the workflow.
    workflow: The workflow in which to set the execution-history-level.
    updated_fields: A list to which the execution-history-level field will
    be added if needed.
  """
  if execution_history_level is not None:
    workflow.executionHistoryLevel = execution_history_level
    updated_fields.append('executionHistoryLevel')


# Flags for CMEK
def AddKmsKeyFlags(parser):
  """Adds flags for configuring the CMEK key.

  Args:
    parser: The flag parser used for the specified command.
  """
  kmskey_group = parser.add_group(mutex=True, hidden=True)
  kmskey_group.add_argument(
      '--kms-key',
      type=arg_parsers.RegexpValidator(_KEY_NAME_PATTERN, _KEY_NAME_ERROR),
      help="""\
        Sets the user managed KMS crypto key used to encrypt the new Workflow
        Revision and the Executions associated with it.

        The KMS crypto key name should match the pattern
        `projects/${PROJECT}/locations/${LOCATION}/keyRings/${KEYRING}/cryptoKeys/${CRYPTOKEY}`
        where ${PROJECT} is the project, ${LOCATION} is the location of the key
        ring, and ${KEYRING} is the key ring that contains the ${CRYPTOKEY}
        crypto key.
      """,
  )
  kmskey_group.add_argument(
      '--clear-kms-key',
      action='store_true',
      help="""\
        Creates the new Workflow Revision and its associated Executions without
        the KMS key specified on the previous revision.
      """,
  )


def SetKmsKey(args, workflow, updated_fields):
  """Sets KMS key for the workflow based on the arguments.

  Also update updated_fields accordingly.

  Args:
    args: Args passed to the command.
    workflow: The workflow in which to set the KMS key.
    updated_fields: A list to which the KMS key field will be added if needed.
  """
  if args.IsSpecified('kms_key') or args.IsSpecified('clear_kms_key'):
    workflow.cryptoKeyName = None if args.clear_kms_key else args.kms_key
    updated_fields.append('cryptoKeyName')


def AddUserEnvVarsFlags(parser):
  """Adds flags for configuring user-defined environment variables."""
  userenvvars_group = parser.add_group(
      mutex=True,
      hidden=False,
      help="""\
        Flags to configure user-defined environment variables for a
        workflow.

        Keys can't be empty strings and can't start with `GOOGLE`
        or `WORKFLOWS`. We recommend that environment variable keys consist
        solely of uppercase letters, digits, and underscores (`_`), and that
        they don't begin with a digit. Consider prefixing your user-defined
        environment variables with a unique key to avoid conflicts with other
        variables.

        If your value contains commas, prefix the mapping with a different
        delimiter character enclosed between `^` (example 1). Use special
        characters in your shell with caution as they might not work as
        intended or need escaping (example 2 escapes a `$` in Bash).

        Example 1: --set-env-vars ^@^KEY1=ONE,VALUE,WITH,COMMAS@KEY2=VALUE2

        Example 2: --set-env-vars ^$^KEY1=VALUE1\\$KEY2=VALUE,WITH,COMMAS,TOO

        A maximum of 20 user-defined environment variables can be defined.
        Each definition string (`KEY=value`) is limited to 4 KiB.
        All keys and values are converted to strings.
      """,
  )

  userenvvars_group.add_argument(
      '--set-env-vars',
      type=arg_parsers.ArgDict(
          key_type=str,
          value_type=str,
          max_length=USER_ENV_VARS_LIMIT,
      ),
      action=arg_parsers.UpdateAction,
      metavar='KEY=VALUE',
      help="""\
        Sets environment variables for the workflow based on a comma-separated
        list of key-value pairs. Will overwrite a workflow's existing
        environment variables.

        Example:
        gcloud workflows deploy ${workflow_name} --set-env-vars policy=global,retry_count=5
      """,
  )

  userenvvars_group.add_argument(
      '--env-vars-file',
      metavar='FILE_PATH',
      type=map_util.ArgDictFile(key_type=str, value_type=str),
      help="""\
        Sets environment variables for the workflow to those stored in a local
        YAML file at the given path. All existing environment variables are
        removed before the new environment variables are added.

        Example:
        gcloud workflows deploy ${workflow_name} --env-vars-file=/path/to/env_vars.yaml

        Inside env_vars.yaml:\n
        policy: global\n
        retry_count: 5 # Service converts this to string "5"
      """,
  )

  userenvvars_group.add_argument(
      '--clear-env-vars',
      action='store_true',
      help="""\
        Clears all user-defined environment variables previously set for the
        workflow.

        Example:
        gcloud workflows deploy ${workflow_name} --clear-env-vars
      """,
  )

  userenvvars_group.add_argument(
      '--remove-env-vars',
      metavar='KEY',
      action=arg_parsers.UpdateAction,
      type=arg_parsers.ArgList(element_type=str),
      help="""\
        Removes user-defined environment variables from a workflow based
        on a list of environment variable keys to be removed.

        Example:
        gcloud workflows deploy ${workflow_name} --remove-env-vars policy,retry_count...
      """,
  )

  userenvvars_group.add_argument(
      '--update-env-vars',
      type=arg_parsers.ArgDict(key_type=str, value_type=str),
      action=arg_parsers.UpdateAction,
      metavar='KEY=VALUE',
      help="""\
        Updates existing or adds new user-defined environment variables based
        on a comma-separated list of key-value pairs.

        Example:
        gcloud workflows deploy ${workflow_name} --update-env-vars policy=regional,retry_count=2
      """,
  )


def ParseExecution(args):
  """Get and validate execution from the args."""
  return args.CONCEPTS.execution.Parse()


def ParseExecutionLabels(args):
  """Get and validate execution labels from the args."""
  messages = apis.GetClientInstance('workflowexecutions', 'v1').MESSAGES_MODULE
  return labels_util.ParseCreateArgs(args, messages.Execution.LabelsValue)


def ParseWorkflow(args):
  """Get and validate workflow from the args."""
  return args.CONCEPTS.workflow.Parse()


def SetSource(args, workflow, updated_fields):
  """Set source for the workflow based on the arguments.

  Also update updated_fields accordingly.
  Currently only local source file is supported.

  Args:
    args: Args passed to the command.
    workflow: The workflow in which to set the source configuration.
    updated_fields: A list to which an appropriate source field will be added.
  """
  if args.source:
    try:
      workflow.sourceContents = files.ReadFileContents(args.source)
    except files.MissingFileError:
      raise exceptions.BadArgumentException(
          '--source', 'specified file does not exist.'
      )
    updated_fields.append('sourceContents')


def SetDescription(args, workflow, updated_fields):
  """Set description for the workflow based on the arguments.

  Also update updated_fields accordingly.

  Args:
    args: Args passed to the command.
    workflow: The workflow in which to set the description.
    updated_fields: A list to which a description field will be added if needed.
  """
  if args.description is not None:
    workflow.description = args.description
    updated_fields.append('description')


def SetServiceAccount(args, workflow, updated_fields):
  """Set service account for the workflow based on the arguments.

  Also update updated_fields accordingly.

  Args:
    args: Args passed to the command.
    workflow: The workflow in which to set the service account.
    updated_fields: A list to which a service_account field will be added if
      needed.
  """
  if args.service_account is not None:
    prefix = ''
    if not args.service_account.startswith('projects/'):
      prefix = 'projects/-/serviceAccounts/'
    workflow.serviceAccount = prefix + args.service_account
    updated_fields.append('serviceAccount')


def SetLabels(labels, workflow, updated_fields):
  """Set labels for the workflow based on the arguments.

  Also update updated_fields accordingly.

  Args:
    labels: Labels parsed as string to be set on the workflow, or None in case
      the field shouldn't be set.
    workflow: The workflow in which to set the labels.
    updated_fields: A list to which a labels field will be added if needed.
  """
  if labels is not None:
    workflow.labels = labels
    updated_fields.append('labels')


def SetUserEnvVars(env_vars, workflow, updated_fields):
  """Sets user-defined environment variables.

  Also updates updated_fields accordingly.

  Args:
    env_vars: Parsed environment variables to be set on the workflow.
    workflow: The workflow in which to set the User Envrionment Variables.
    updated_fields: A list to which the userEnvVars field will be added if
      needed.
  """
  if env_vars is None:
    return
  workflow.userEnvVars = None if env_vars is CLEAR_ENVIRONMENT else env_vars
  updated_fields.append('userEnvVars')


def UpdateUserEnvVars(env_vars, workflow, updated_fields):
  """Updates user-defined environment variables.

  Also updates updated_fields accordingly.

  Args:
    env_vars: Parsed environment variables to be set on the workflow.
    workflow: The workflow in which to set the User Envrionment Variables.
    updated_fields: A list to which the userEnvVars field will be added if
      needed.
  """
  if env_vars is None:
    return
  env_vars_cls = apis.GetClientInstance(
      'workflows',
      'v1',
  ).MESSAGES_MODULE.Workflow.UserEnvVarsValue
  workflow.userEnvVars = env_vars_cls(
      additionalProperties=[
          env_vars_cls.AdditionalProperty(key=key, value=value)
          for key, value in sorted(env_vars.items())
      ]
  )
  updated_fields.append('userEnvVars')