HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/scc/manage/parsing.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common flag parsing for management gcloud."""
import json
import re

from apitools.base.py import encoding
from googlecloudsdk.api_lib.resource_manager import folders
from googlecloudsdk.command_lib.scc.manage import constants
from googlecloudsdk.command_lib.scc.manage import errors
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources
from googlecloudsdk.core import yaml
from googlecloudsdk.generated_clients.apis.securitycentermanagement.v1 import securitycentermanagement_v1_messages as messages

_CUSTOM_MODULE_ID_REGEX = re.compile('[0-9]{1,20}')


def GetParentResourceNameFromArgs(args) -> str:
  """Returns the relative path to the parent from args.

  Args:
    args: command line args.

  Returns:
    The relative path. e.g. 'projects/foo/locations/global',
    'folders/1234/locations/global'.
  """
  if args.parent:
    return f'{_ParseParent(args.parent).RelativeName()}/locations/global'

  return f'{_GetParentResourceFromArgs(args).RelativeName()}/locations/global'


def _GetParentResourceFromArgs(args):
  if args.organization:
    return resources.REGISTRY.Parse(
        args.organization, collection='cloudresourcemanager.organizations'
    )
  elif args.folder:
    return folders.FoldersRegistry().Parse(
        args.folder, collection='cloudresourcemanager.folders'
    )
  else:
    return resources.REGISTRY.Parse(
        args.project or properties.VALUES.core.project.Get(required=True),
        collection='cloudresourcemanager.projects',
    )


def GetServiceNameFromArgs(args) -> str:
  """Returns the specified service name from args if it exists.

  Otherwise, an exception is raised detailing the parsing error along with the
  expectation.

  Args:
    args: The argument input as the gcloud command.

  Raises:
    InvalidServiceNameError: the specified service name was invalid.
  """

  parent = GetParentResourceNameFromArgs(args)

  maybe_service_name_or_abbr = args.service_name.lower()
  service = constants.SERVICE_INVENTORY.get(maybe_service_name_or_abbr)

  if service:
    return f'{parent}/{constants.SERVICE_RESOURCE_PLURAL_NAME}/{service.name}'
  else:
    raise errors.InvalidServiceNameError(args.service_name)


def GetModuleIdFromArgs(args) -> str:
  """Returns the module id from args."""
  if not args.module_id_or_name:
    raise errors.InvalidCustomModuleIdError(None)

  match = _CUSTOM_MODULE_ID_REGEX.fullmatch(args.module_id_or_name)

  if match:
    return match[0]
  else:
    raise errors.InvalidCustomModuleIdError(args.module_id_or_name)


def GetModuleNameFromArgs(args, module_type: constants.CustomModuleType) -> str:
  """Returns the specified module name from args if it exists.

  Otherwise, an exception is raised detailing the parsing error along with the
  expectation.

  Args:
    args: the args
    module_type: the module type (see
      googlecloudsdk.command_lib.scc.manage.constants)

  Raises:
    MissingCustomModuleNameOrIdError: no module name or id was specified.
    InvalidCustomModuleNameError: the specified module name was invalid.
    InvalidCustomModuleIdError: the specified module id was invalid.
  """

  if not args.module_id_or_name:
    raise errors.MissingCustomModuleNameOrIdError()

  # First try to see if we can parse a resource name
  collections = [
      f'securitycentermanagement.organizations.locations.{module_type}',
      f'securitycentermanagement.projects.locations.{module_type}',
      f'securitycentermanagement.folders.locations.{module_type}',
  ]

  is_possible_resource_name = (
      _IsPossibleResourceName(args.module_id_or_name)
      or len(args.GetSpecifiedArgNames()) == 1
  )

  for collection in collections:
    try:
      return resources.REGISTRY.Parse(
          args.module_id_or_name, collection=collection
      ).RelativeName()
    except resources.RequiredFieldOmittedException:
      pass

  if is_possible_resource_name:
    # The error messages provided by the default gcloud parsing are awful so we
    # detect a resource name misformatting here and print a better error
    raise errors.InvalidCustomModuleNameError(
        args.module_id_or_name, module_type
    )

  parent = GetParentResourceNameFromArgs(args)
  module_id = GetModuleIdFromArgs(args)

  return f'{parent}/{module_type}/{module_id}'


def _ParseParent(parent: str) -> str:
  """Extracts parent name from a string of the form {organizations|projects|folders}/<id>."""

  if parent.startswith('organizations/'):
    return resources.REGISTRY.Parse(
        parent, collection='cloudresourcemanager.organizations'
    )
  elif parent.startswith('folders/'):
    return folders.FoldersRegistry().Parse(
        parent, collection='cloudresourcemanager.folders'
    )
  elif parent.startswith('projects/'):
    return resources.REGISTRY.Parse(
        parent,
        collection='cloudresourcemanager.projects',
    )
  else:
    raise errors.InvalidParentError(parent)


def _IsPossibleResourceName(name: str) -> bool:
  return (
      name.startswith('organizations')
      or name.startswith('projects')
      or name.startswith('folders')
  )


def GetCustomConfigFromArgs(file):
  """Process the custom config file for the custom module."""
  if file is not None:
    try:
      config_dict = yaml.load(file)
      return encoding.DictToMessage(config_dict, messages.CustomConfig)
    except yaml.YAMLParseError as ype:
      raise errors.InvalidCustomConfigFileError(
          'Error parsing custom config file [{}]'.format(ype)
      )


def GetTestResourceFromArgs(file):
  """Process the test resource data file for the custom module to test against."""
  try:
    resource_dict = yaml.load(file)

    return encoding.DictToMessage(resource_dict, messages.SimulatedResource)
  except yaml.YAMLParseError as ype:
    raise errors.InvalidResourceFileError(
        'Error parsing resource file [{}]'.format(ype)
    )


def GetConfigValueFromArgs(file):
  """Process the config custom file for the custom module."""
  if file is not None:
    try:
      config = json.loads(file)
      return encoding.DictToMessage(
          config, messages.EventThreatDetectionCustomModule.ConfigValue
      )
    except json.JSONDecodeError as e:
      raise errors.InvalidConfigValueFileError(
          'Error parsing config value file [{}]'.format(e)
      )
  else:
    return None


def ParseJSONFile(file):
  """Converts the contents of a JSON file into a string."""
  if file is not None:
    try:
      config = json.loads(file)
      return json.dumps(config)
    except json.JSONDecodeError as e:
      raise errors.InvalidConfigValueFileError(
          'Error parsing config value file [{}]'.format(e)
      )
  else:
    return None


def GetEnablementStateFromArgs(
    enablement_state: str,
    module_type: constants.CustomModuleType
):
  """Parse the enablement state."""
  if module_type == constants.CustomModuleType.SHA:
    state_enum = (
        messages.SecurityHealthAnalyticsCustomModule.EnablementStateValueValuesEnum
    )
  elif module_type == constants.CustomModuleType.ETD:
    state_enum = (
        messages.EventThreatDetectionCustomModule.EnablementStateValueValuesEnum
    )
  else:
    raise errors.InvalidModuleTypeError(
        f'Module type "{module_type}" is not a valid module type.'
    )

  if enablement_state is None:
    raise errors.InvalidEnablementStateError(
        'Error parsing enablement state. Enablement state cannot be empty.'
    )

  state = enablement_state.upper()

  if state == 'ENABLED':
    return state_enum.ENABLED
  elif state == 'DISABLED':
    return state_enum.DISABLED
  elif state == 'INHERITED':
    return state_enum.INHERITED
  else:
    raise errors.InvalidEnablementStateError(
        f'Error parsing enablement state. "{state}" is not a valid enablement'
        ' state. Please provide one of ENABLED, DISABLED, or INHERITED.'
    )


def CreateUpdateMaskFromArgs(args):
  """Create an update mask with the args given."""
  if args.enablement_state is not None and args.custom_config_file is not None:
    return 'enablement_state,custom_config'
  elif args.enablement_state is not None:
    return 'enablement_state'
  elif args.custom_config_file is not None:
    return 'custom_config'
  else:
    raise errors.InvalidUpdateMaskInputError(
        'Error parsing Update Mask. Either a custom configuration or an'
        ' enablement state (or both) must be provided to update the custom'
        ' module.'
    )


def GetModuleConfigValueFromArgs(file: str):
  """Process the module config file for the service."""
  if file is not None:
    try:
      config = yaml.load(file)
      return encoding.DictToMessage(
          config, messages.SecurityCenterService.ModulesValue
      )
    except (yaml.YAMLParseError, AttributeError) as ype:
      raise errors.InvalidConfigValueFileError(
          f'Error parsing config value file [{ype}]'
      )
  else:
    return None


def GetServiceEnablementStateFromArgs(enablement_state: str):
  """Parse the service enablement state."""
  state_enum = (
      messages.SecurityCenterService.IntendedEnablementStateValueValuesEnum
  )

  if enablement_state is None:
    return None

  state = enablement_state.upper()
  if state == 'ENABLED':
    return state_enum.ENABLED
  elif state == 'DISABLED':
    return state_enum.DISABLED
  elif state == 'INHERITED':
    return state_enum.INHERITED
  else:
    raise errors.InvalidEnablementStateError(
        f'Error parsing enablement state. "{state}" is not a valid enablement'
        ' state. Please provide one of ENABLED, DISABLED, or INHERITED.'
    )


def CreateUpdateMaskFromArgsForService(args):
  """Create an update mask with the args given for the given service."""
  if args.enablement_state is not None and args.module_config_file is not None:
    return 'intended_enablement_state,modules'
  elif args.enablement_state is not None:
    return 'intended_enablement_state'
  elif args.module_config_file is not None:
    return 'modules'
  else:
    raise errors.InvalidUpdateMaskInputError(
        'Error parsing Update Mask. Either a module configuration or an'
        ' enablement state (or both) must be provided to update the service.'
    )


def GetModuleListFromArgs(args) -> {str}:
  """Returns a list of module names from args."""

  if not args.filter_modules:
    return []

  modules = args.filter_modules.strip('[]')
  modules_list = modules.split(',')
  modules_set = {module.strip() for module in modules_list}

  return modules_set


def GetModuleNamePathFromArgs(
    args, module_type: constants.CustomModuleType
) -> str:
  """Returns the specified module name path from args if it exists.

  Args:
    args: command line args.
    module_type: the module type (see
      googlecloudsdk.command_lib.scc.manage.constants)

  Returns:
    The relative path. e.g.
    'organizations/1234/locations/global/{module_type}',
    'projects/foo/locations/global/{module_type}'.
  """
  if args.parent:
    return (
        f'{_ParseParentFlag(args.parent).RelativeName()}/locations/global/'
        f'{module_type}'
    )

  return (
      f'{_GetParentResourceFromArg(args).RelativeName()}/locations/global/'
      f'{module_type}'
  )


def _ParseParentFlag(parent: str) -> str:
  """Extracts parent name from {organizations|projects}/<id>.

  Args:
    parent: The parent string to parse.

  Returns:
    The relative path of the parent.

  Raises:
    InvalidParentFlagError: The provided parent string is invalid.
  """

  if parent.startswith('organizations/'):
    return resources.REGISTRY.Parse(
        parent, collection='cloudresourcemanager.organizations'
    )
  if parent.startswith('projects/'):
    return resources.REGISTRY.Parse(
        parent,
        collection='cloudresourcemanager.projects',
    )

  raise errors.InvalidParentFlagError(parent)


def _GetParentResourceFromArg(args):
  """Returns the parent resource from the given args.

  Args:
    args: command line args.

  Returns:
    The parent resource.
  """
  if args.organization:
    return resources.REGISTRY.Parse(
        args.organization, collection='cloudresourcemanager.organizations'
    )
  return resources.REGISTRY.Parse(
      args.project or properties.VALUES.core.project.Get(required=True),
      collection='cloudresourcemanager.projects',
    )