HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/accesscontextmanager/levels.py
# -*- coding: utf-8 -*- #
# Copyright 2017 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command line processing utilities for access levels."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

from apitools.base.py import encoding

from googlecloudsdk.api_lib.accesscontextmanager import util
from googlecloudsdk.api_lib.util import waiter
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope.concepts import concepts
from googlecloudsdk.command_lib.accesscontextmanager import common
from googlecloudsdk.command_lib.accesscontextmanager import policies
from googlecloudsdk.command_lib.util.apis import arg_utils
from googlecloudsdk.command_lib.util.concepts import concept_parsers
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import resources
from googlecloudsdk.core import yaml
import six

COLLECTION = 'accesscontextmanager.accessPolicies.accessLevels'


_INVALID_FORMAT_ERROR = """
Invalid format: {}

The valid fields for the YAML objects in this file type are [{}].

For an access level condition file, an example of the YAML-formatted list of conditions will look like:

 - ipSubnetworks:
   - 162.222.181.197/24
   - 2001:db8::/48
 - members:
   - user:user@example.com

For access levels file, an example of the YAML-formatted list of access levels will look like:

 - name: accessPolicies/my_policy/accessLevels/my_level
   title: My Level
   description: Level for foo.
   basic:
     combiningFunction: AND
     conditions:
     - ipSubnetworks:
       - 192.168.100.14/24
       - 2001:db8::/48
     - members:
       - user1:user1@example.com
"""


class ParseResponseError(exceptions.Error):

  def __init__(self, reason):
    super(ParseResponseError,
          self).__init__('Issue parsing response: {}'.format(reason))


class ParseError(exceptions.Error):

  def __init__(self, path, reason):
    super(ParseError,
          self).__init__('Issue parsing file [{}]: {}'.format(path, reason))


class InvalidFormatError(ParseError):

  def __init__(self, path, reason, message_class):
    valid_fields = [f.name for f in message_class.all_fields()]
    super(InvalidFormatError,
          self).__init__(path, (_INVALID_FORMAT_ERROR).format(
              reason, ', '.join(valid_fields)))


def _LoadData(path):
  try:
    return yaml.load_path(path)
  except yaml.FileLoadError as err:
    raise ParseError(path, 'Problem loading file: {}'.format(err))
  except yaml.YAMLParseError as err:
    raise ParseError(path, 'Problem parsing data as YAML: {}'.format(err))


def _ValidateAllBasicConditionFieldsRecognized(path, conditions):
  unrecognized_fields = set()
  for condition in conditions:
    if condition.all_unrecognized_fields():
      unrecognized_fields.update(condition.all_unrecognized_fields())
  if unrecognized_fields:
    raise InvalidFormatError(
        path,
        'Unrecognized fields: [{}]'.format(', '.join(unrecognized_fields)),
        type(conditions[0]))


def _ValidateAllCustomFieldsRecognized(path, expr):
  if expr.all_unrecognized_fields():
    raise InvalidFormatError(
        path, 'Unrecognized fields: [{}]'.format(', '.join(
            expr.all_unrecognized_fields())), type(expr))


def _ValidateAllLevelFieldsRecognized(path, levels):
  unrecognized_fields = set()
  for level in levels:
    if level.all_unrecognized_fields():
      unrecognized_fields.update(level.all_unrecognized_fields())
  if unrecognized_fields:
    raise InvalidFormatError(
        path,
        'Unrecognized fields: [{}]'.format(', '.join(unrecognized_fields)),
        type(levels[0]))


def ParseReplaceAccessLevelsResponse(api_version):
  """Wrapper around ParseReplaceAccessLevelsResponse to accept api version."""

  def VersionedParseReplaceAccessLevelsResponse(lro, unused_args):
    """Parse the Long Running Operation response of the ReplaceAccessLevels call.

    Args:
      lro: Long Running Operation response of ReplaceAccessLevels.
      unused_args: not used.

    Returns:
      The replacement Access Levels created by the ReplaceAccessLevels call.

    Raises:
      ParseResponseError: if the response could not be parsed into the proper
      object.
    """
    client = util.GetClient(version=api_version)
    operation_ref = resources.REGISTRY.Parse(
        lro.name, collection='accesscontextmanager.operations')
    poller = common.BulkAPIOperationPoller(client.accessPolicies_accessLevels,
                                           client.operations, operation_ref)

    return waiter.WaitFor(
        poller, operation_ref,
        'Waiting for Replace Access Levels operation [{}]'.format(
            operation_ref.Name()))

  return VersionedParseReplaceAccessLevelsResponse


def ParseBasicLevelConditions(api_version):
  """Wrapper around ParseCustomLevel to accept api version."""

  def VersionedParseBasicLevelConditions(path):
    """Parse a YAML representation of basic level conditions.

    Args:
      path: str, path to file containing basic level conditions

    Returns:
      list of Condition objects.

    Raises:
      ParseError: if the file could not be read into the proper object
    """

    data = yaml.load_path(path)
    if not data:
      raise ParseError(path, 'File is empty')

    messages = util.GetMessages(version=api_version)
    message_class = messages.Condition
    try:
      conditions = [encoding.DictToMessage(c, message_class) for c in data]
    except Exception as err:
      raise InvalidFormatError(path, six.text_type(err), message_class)

    _ValidateAllBasicConditionFieldsRecognized(path, conditions)

    return conditions

  return VersionedParseBasicLevelConditions


def ParseCustomLevel(api_version):
  """Wrapper around ParseCustomLevel to accept api version."""

  def VersionedParseCustomLevel(path):
    """Parse a YAML representation of custom level conditions.

    Args:
      path: str, path to file containing custom level expression

    Returns:
      string of CEL expression.

    Raises:
      ParseError: if the file could not be read into the proper object
    """

    data = yaml.load_path(path)
    if not data:
      raise ParseError(path, 'File is empty')

    messages = util.GetMessages(version=api_version)
    message_class = messages.Expr
    try:
      expr = encoding.DictToMessage(data, message_class)
    except Exception as err:
      raise InvalidFormatError(path, six.text_type(err), message_class)

    _ValidateAllCustomFieldsRecognized(path, expr)
    return expr

  return VersionedParseCustomLevel


def ParseAccessLevels(api_version):
  """Wrapper around ParseAccessLevels to accept api version."""

  def VersionedParseAccessLevels(path):
    """Parse a YAML representation of a list of Access Levels with basic/custom level conditions.

    Args:
      path: str, path to file containing basic/custom access levels

    Returns:
      list of Access Level objects.

    Raises:
      ParseError: if the file could not be read into the proper object
    """

    data = yaml.load_path(path)
    if not data:
      raise ParseError(path, 'File is empty')

    messages = util.GetMessages(version=api_version)
    message_class = messages.AccessLevel
    try:
      levels = [encoding.DictToMessage(c, message_class) for c in data]
    except Exception as err:
      raise InvalidFormatError(path, six.text_type(err), message_class)

    _ValidateAllLevelFieldsRecognized(path, levels)
    return levels

  return VersionedParseAccessLevels


def ClearCombiningFunctionUnlessBasicSpecSet(ref, args, req=None):
  """Clear basic field (and default combine function) if spec not provided."""
  del ref  # unused
  if req is None:
    return req

  if not args.IsSpecified('basic_level_spec'):
    req.accessLevel.reset('basic')

  return req


def GetAttributeConfig():
  return concepts.ResourceParameterAttributeConfig(
      name='level', help_text='The ID of the access level.')


def GetResourceSpec():
  return concepts.ResourceSpec(
      'accesscontextmanager.accessPolicies.accessLevels',
      resource_name='level',
      accessPoliciesId=policies.GetAttributeConfig(),
      accessLevelsId=GetAttributeConfig())


def AddResourceArg(parser, verb):
  """Add a resource argument for an access level.

  NOTE: Must be used only if it's the only resource arg in the command.

  Args:
    parser: the parser for the command.
    verb: str, the verb to describe the resource, such as 'to update'.
  """
  concept_parsers.ConceptParser.ForResource(
      'level',
      GetResourceSpec(),
      'The access level {}.'.format(verb),
      required=True).AddToParser(parser)


def AddResourceFlagArg(parser, verb):
  """Add a resource argument for an access level.

  NOTE: Must be used only if it's the only resource arg in the command.

  Args:
    parser: the parser for the command.
    verb: str, the verb to describe the resource, such as 'to update'.
  """
  concept_parsers.ConceptParser.ForResource(
      '--level',
      GetResourceSpec(),
      'The access level {}.'.format(verb),
      required=True).AddToParser(parser)


def GetCombineFunctionEnumMapper(api_version=None):
  return arg_utils.ChoiceEnumMapper(
      '--combine-function',
      util.GetMessages(
          version=api_version).BasicLevel.CombiningFunctionValueValuesEnum,
      custom_mappings={
          'AND': 'and',
          'OR': 'or'
      },
      required=False,
      help_str='For a basic level, determines how conditions are combined.',
  )


def AddLevelArgs(parser):
  """Add common args for level create/update commands."""
  args = [
      common.GetDescriptionArg('access level'),
      common.GetTitleArg('access level'),
  ]
  for arg in args:
    arg.AddToParser(parser)


def AddBasicSpecArgs(parser, api_version):
  """Add args for basic spec (with no custom spec)."""
  basic_level_help_text = (
      'Path to a file containing a list of basic access level conditions.\n\n'
      'An access level condition file is a YAML-formatted list of conditions, '
      'which are YAML objects representing a Condition as described in the API '
      'reference. For example:\n\n'
      '    ```\n'
      '     - ipSubnetworks:\n'
      '       - 162.222.181.197/24\n'
      '       - 2001:db8::/48\n'
      '     - members:\n'
      '       - user:user@example.com\n'
      '    ```')
  basic_level_spec_arg = base.Argument(
      '--basic-level-spec',
      help=basic_level_help_text,
      type=ParseBasicLevelConditions(api_version))
  basic_level_combine_arg = GetCombineFunctionEnumMapper(
      api_version=api_version).choice_arg

  basic_level_spec_arg.AddToParser(parser)
  basic_level_combine_arg.AddToParser(parser)


def AddBasicAndCustomSpecArgs(parser, api_version):
  """Add args for basic and custom specs (grouped together)."""
  basic_level_help_text = (
      'Path to a file containing a list of basic access level conditions.\n\n'
      'An access level condition file is a YAML-formatted list of conditions,'
      'which are YAML objects representing a Condition as described in the API '
      'reference. For example:\n\n'
      '    ```\n'
      '     - ipSubnetworks:\n'
      '       - 162.222.181.197/24\n'
      '       - 2001:db8::/48\n'
      '     - members:\n'
      '       - user:user@example.com\n'
      '    ```')
  custom_level_help_text = (
      'Path to a file representing an expression for an access level.\n\n'
      'The expression is in the Common Expression Langague (CEL) format.'
      'For example:\n\n'
      '    ```\n'
      '     expression: "origin.region_code in [\'US\', \'CA\']"\n'
      '    ```')

  basic_level_spec_arg = base.Argument(
      '--basic-level-spec',
      help=basic_level_help_text,
      type=ParseBasicLevelConditions(api_version))
  basic_level_combine_arg = GetCombineFunctionEnumMapper(
      api_version=api_version).choice_arg

  basic_level_spec_group = base.ArgumentGroup(help='Basic level specification.')
  basic_level_spec_group.AddArgument(basic_level_spec_arg)
  basic_level_spec_group.AddArgument(basic_level_combine_arg)

  custom_level_spec_arg = base.Argument(
      '--custom-level-spec',
      help=custom_level_help_text,
      type=ParseCustomLevel(api_version))

  # Custom level spec group only consists of a single argument.
  # This is done so help text between basic/custom specs is consistent.
  custom_level_spec_group = base.ArgumentGroup(
      help='Custom level specification.')
  custom_level_spec_group.AddArgument(custom_level_spec_arg)

  level_spec_group = base.ArgumentGroup(help='Level specification.', mutex=True)

  level_spec_group.AddArgument(basic_level_spec_group)
  level_spec_group.AddArgument(custom_level_spec_group)

  level_spec_group.AddToParser(parser)


def AddLevelSpecArgs(parser, api_version=None, feature_mask=None):
  """Add arguments for in-file level specifications."""
  if feature_mask is None:
    feature_mask = {}

  if feature_mask.get('custom_levels', False):
    AddBasicAndCustomSpecArgs(parser, api_version)
  else:
    AddBasicSpecArgs(parser, api_version)