HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/scc/findings/util.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Shared utility functions for Cloud SCC findings commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import re

from apitools.base.py import encoding
from googlecloudsdk.api_lib.scc import securitycenter_client
from googlecloudsdk.command_lib.scc import errors
from googlecloudsdk.command_lib.scc import util as scc_util
from googlecloudsdk.core.util import times


def ValidateMutexOnFindingAndSourceAndOrganization(args):
  """Validates that only a full resource name or split arguments are provided."""
  if "/" in args.finding and (
      args.IsKnownAndSpecified("organization")
      or args.IsKnownAndSpecified("source")
  ):
    raise errors.InvalidSCCInputError(
        "Only provide a full resource name "
        "(organizations/123/sources/456/findings/789) or an --organization flag"
        " and --sources flag, not both."
    )


def GetFullFindingName(args, version):
  """Returns relative resource name for a finding name.

  Args:
    args: Argument namespace.
    version: Api version.

  Returns:
    Relative resource name
    if no location is specified the result will be one of the following forms
      `organizations/{organization_id}/sources/{source_id}/finding/{finding_id}`
      `folders/{folders_id}/sources/{source_id}/finding/{finding_id}`
      `projects/{projects_id}/sources/{source_id}/finding/{finding_id}`
    if a location is specified the result will be one of the following forms
      `organizations/{organization_id}/sources/{source_id}/locations/{location_id}/finding/{finding_id}`
      `folders/{folders_id}/sources/{source_id}/locations/{location_id}/finding/{finding_id}`
      `projects/{projects_id}/sources/{source_id}/locations/{location_id}/finding/{finding_id}`
  """
  resource_pattern = re.compile(
      "(organizations|projects|folders)/.*/sources/[0-9-]+/findings/[a-zA-Z0-9]+$"
  )
  region_resource_pattern = re.compile(
      "(organizations|projects|folders)/.*/sources/[0-9-]+/locations/.*/findings/[a-zA-Z0-9]+$"
  )
  id_pattern = re.compile("^[a-zA-Z0-9]+$")
  if region_resource_pattern.match(args.finding):
    # Handle finding id as full resource name.
    return args.finding
  if resource_pattern.match(args.finding):
    if version == "v2":
      return GetRegionalizedResourceName(args, version)
    return args.finding
  if id_pattern.match(args.finding):
    return f"{GetFullSourceName(args, version)}/findings/{args.finding}"

  raise errors.InvalidSCCInputError(
      "Finding must match either the full resource name or only the finding id."
  )


def GetFullSourceName(args, version):

  """Returns relative resource name for a source from --source argument.

  Args:
    args: Argument namespace.
    version: Api version.

  Returns:
    Relative resource name
    if args.source is not provided an exception will be raised
    if no location is specified in argument: sources/{source_id}
    if a location is specified: sources/{source_id}/locations/{location_id}
  """

  resource_pattern = re.compile(
      "(organizations|projects|folders)/.*/sources/[0-9-]+"
  )
  region_resource_pattern = re.compile(
      "(organizations|projects|folders)/.*/sources/[0-9-]+/locations/[a-zA-Z0-9-]+$"
  )
  id_pattern = re.compile("[0-9-]+")

  if not args.source:
    raise errors.InvalidSCCInputError(
        "Finding source must be provided in --source flag or full resource"
        " name."
    )

  if region_resource_pattern.match(args.source):
    return args.source

  location = scc_util.ValidateAndGetLocation(args, version)
  if resource_pattern.match(args.source):
    source = args.source
    if version == "v2":
      return f"{source}/locations/{location}"
    return source

  if (id_pattern.match(args.source) and
      (hasattr(args, "finding") or hasattr(args, "parent"))):
    source = f"{scc_util.GetFindingsParentFromPositionalArguments(args)}/sources/{args.source}"
    if version == "v2":
      return f"{source}/locations/{location}"
    return source

  if id_pattern.match(args.source):
    source = f"{scc_util.GetParentFromPositionalArguments(args)}/sources/{args.source}"
    if version == "v2":
      return f"{source}/locations/{location}"
    return source

  raise errors.InvalidSCCInputError(
      "The source must either be the full resource "
      "name or the numeric source ID."
  )


def GetSourceParentFromFindingName(resource_name, version):
  """Get parent (with source) from Finding name i.e remove 'findings/{finding_name}'.

  Args:
    resource_name: finding name {parent with source}/findings/{findingID}
    version: API version.

  Returns:
    The parent with source or parent with source and location
    examples:
    if no location is specified the result will be one of the following forms
      `organizations/{organization_id}/sources/{source_id}`
      `folders/{folders_id}/sources/{source_id}`
      `projects/{projects_id}/sources/{source_id}`
    if a location is specified the result will be one of the following forms
      `organizations/{organization_id}/sources/{source_id}/locations/{location_id}`
      `folders/{folders_id}/sources/{source_id}/locations/{location_id}`
      `projects/{projects_id}/sources/{source_id}/locations/{location_id}`
  """
  resource_pattern = re.compile(
      "(organizations|projects|folders)/.*/sources/[0-9]+"
  )
  if not resource_pattern.match(resource_name):
    raise errors.InvalidSCCInputError(
        "When providing a full resource path, it must also include "
        "the organization, project, or folder prefix."
    )
  list_source_components = resource_name.split("/")
  if version == "v1":
    return f"{GetParentFromResourceName(resource_name)}/{list_source_components[2]}/{list_source_components[3]}"
  if version == "v2":
    # Include location.
    return f"{GetParentFromResourceName(resource_name)}/{list_source_components[2]}/{list_source_components[3]}/{list_source_components[4]}/{list_source_components[5]}"


def GetFindingIdFromName(finding_name):
  """Gets a finding id from the full resource name."""
  resource_pattern = re.compile(
      "(organizations|projects|folders)/.*/sources/[0-9-]+/findings/[a-zA-Z0-9]+$"
  )
  region_resource_pattern = re.compile(
      "(organizations|projects|folders)/.*/sources/[0-9-]+/locations/.*/findings/[a-zA-Z0-9]+$"
  )
  if not resource_pattern.match(
      finding_name
  ) and not region_resource_pattern.match(finding_name):
    raise errors.InvalidSCCInputError(
        "When providing a full resource path, it must include the pattern "
        "organizations/[0-9]+/sources/[0-9-]+/findings/[a-zA-Z0-9]+."
    )
  list_finding_components = finding_name.split("/")
  return list_finding_components[len(list_finding_components) - 1]


def GetParentFromResourceName(resource_name):
  list_organization_components = resource_name.split("/")
  return f"{list_organization_components[0]}/{list_organization_components[1]}"


def ConvertStateInput(state, version):
  """Convert state input to messages.Finding.StateValueValuesEnum object."""
  messages = securitycenter_client.GetMessages(version)
  if state:
    state = state.upper()

  state_dict = {}
  if version == "v1":
    unspecified_state = messages.Finding.StateValueValuesEnum.STATE_UNSPECIFIED
    state_dict["v1"] = {
        "ACTIVE": messages.Finding.StateValueValuesEnum.ACTIVE,
        "INACTIVE": messages.Finding.StateValueValuesEnum.INACTIVE,
        "STATE_UNSPECIFIED": unspecified_state,
    }
  else:
    v2_unspecified_state = (
        messages.GoogleCloudSecuritycenterV2Finding.StateValueValuesEnum.STATE_UNSPECIFIED
    )
    state_dict["v2"] = {
        "ACTIVE": (
            messages.GoogleCloudSecuritycenterV2Finding.StateValueValuesEnum.ACTIVE
        ),
        "INACTIVE": (
            messages.GoogleCloudSecuritycenterV2Finding.StateValueValuesEnum.INACTIVE
        ),
        "STATE_UNSPECIFIED": v2_unspecified_state,
    }
  return state_dict[version].get(
      state, state_dict[version]["STATE_UNSPECIFIED"]
  )


def ConvertMuteStateInput(mute_state, messages):
  """Convert mute state input to messages.BulkMuteFindingsRequest.MuteStateValueValuesEnum object."""
  if mute_state == "muted":
    return messages.BulkMuteFindingsRequest.MuteStateValueValuesEnum.MUTED
  elif mute_state == "undefined":
    return messages.BulkMuteFindingsRequest.MuteStateValueValuesEnum.UNDEFINED
  raise errors.InvalidSCCInputError(
      "Mute state must be one of [muted, undefined]."
  )


def ValidateAndGetParent(args):
  """Validates parent."""
  if args.organization is not None:  # Validates organization.
    if "/" in args.organization:
      pattern = re.compile("^organizations/[0-9]{1,19}$")
      if not pattern.match(args.organization):
        raise errors.InvalidSCCInputError(
            "When providing a full resource path, it must include the pattern "
            "'^organizations/[0-9]{1,19}$'."
        )
      else:
        return args.organization
    else:
      pattern = re.compile("^[0-9]{1,19}$")
      if not pattern.match(args.organization):
        raise errors.InvalidSCCInputError(
            "Organization does not match the pattern '^[0-9]{1,19}$'."
        )
      else:
        return f"organizations/{args.organization}"

  if args.folder is not None:  # Validates folder.
    if "/" in args.folder:
      pattern = re.compile("^folders/.*$")
      if not pattern.match(args.folder):
        raise errors.InvalidSCCInputError(
            "When providing a full resource path, it must include the pattern "
            "'^folders/.*$'."
        )
      else:
        return args.folder
    else:
      return f"folders/{args.folder}"

  if args.project is not None:  # Validates project.
    if "/" in args.project:
      pattern = re.compile("^projects/.*$")
      if not pattern.match(args.project):
        raise errors.InvalidSCCInputError(
            "When providing a full resource path, it must include the pattern "
            "'^projects/.*$'."
        )
      else:
        return args.project
    else:
      return f"projects/{args.project}"


def ValidateMutexOnSourceAndParent(args):
  """Validates that only a full resource name or split arguments are provided."""
  if "/" in args.source and args.parent is not None:
    raise errors.InvalidSCCInputError(
        "Only provide a full resource name "
        "(organizations/123/sources/456) or a --parent flag, not both."
    )


def ExtractSecurityMarksFromResponse(response, args):
  """Returns security marks from finding response."""
  del args
  if isinstance(response, list):
    list_finding_response = response
  else:
    list_finding_response = list(response)
  if len(list_finding_response) > 1:
    raise errors.InvalidSCCInputError(
        "ListFindingResponse must only return one finding since it is "
        "filtered by Finding Name."
    )
  for finding_result in list_finding_response:
    return finding_result.finding.securityMarks


def ValidateSourceAndFindingIdIfParentProvided(args):
  """Validates that source and finding id are provided if parent is provided."""
  if args.source is None:
    raise errors.InvalidSCCInputError("--source flag must be provided.")
  if "/" in args.finding:
    raise errors.InvalidSCCInputError(
        "Finding id must be provided, instead of the full resource name."
    )


def ValidateLocationAndGetRegionalizedParent(args, parent):
  """Appends location to parent."""
  if args.location:
    if "/" in args.location:
      pattern = re.compile("^locations/[A-Za-z0-9-]{0,61}$")
      if not pattern.match(args.location):
        raise errors.InvalidSCCInputError(
            "When providing a full resource path, it must include the pattern "
            "'^locations/.*$'."
        )
      else:
        return f"{parent}/{args.location}"
    else:
      return f"{parent}/locations/{args.location}"


def GetRegionalizedResourceName(args, version):
  """Returns regionalized resource name."""
  location = scc_util.ValidateAndGetLocation(args, version)
  name_components = args.finding.split("/")
  return f"{name_components[0]}/{name_components[1]}/{name_components[2]}/{name_components[3]}/locations/{location}/{name_components[4]}/{name_components[5]}"


def ConvertSourceProperties(source_properties_dict, version):
  """Hook to capture "key1=val1,key2=val2" as SourceProperties object."""
  messages = securitycenter_client.GetMessages(version)
  if version == "v1":
    return encoding.DictToMessage(
        source_properties_dict, messages.Finding.SourcePropertiesValue
    )
  elif version == "v2":
    return encoding.DictToMessage(
        source_properties_dict,
        messages.GoogleCloudSecuritycenterV2Finding.SourcePropertiesValue,
    )
  raise errors.InvalidAPIVersion("Invalid API version")


def GetApiVersionUsingDeprecatedArgs(args, deprecated_args):
  """Determine what version to call from --location and --api-version."""
  if not args.parent:
    # If the parent argument is not provided in the command, it can be derived
    # from properties set by gcloud config.
    parent = scc_util.GetParentFromPositionalArguments(args)
  else:
    parent = args.parent
  return scc_util.GetVersionFromArguments(args, parent, deprecated_args)


def ValidateAndFormatExportTime(export_time):
  """Validates the export time."""
  try:
    read_time_dt = times.ParseDateTime(export_time)
    return times.FormatDateTime(read_time_dt)
  except (times.DateTimeSyntaxError, times.DateTimeValueError) as e:
    raise errors.InvalidSCCInputError(
        "Invalid export time format. Please provide a valid date/time. Example:"
        " 2024-08-20T12:00:00Z"
    ) from e


def ValidateDataset(dataset):
  """Validates the dataset."""
  dataset_pattern = re.compile(
      "^projects/[a-zA-Z0-9-]+/datasets/[a-zA-Z0-9_]+$"
  )
  if not dataset_pattern.match(dataset):
    raise errors.InvalidSCCInputError(
        "Dataset must match the pattern"
        " projects/[a-zA-Z0-9-]+/datasets/[a-zA-Z0-9_]+."
    )
  return dataset