HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/scc/hooks.py
# -*- coding: utf-8 -*- #
# Copyright 2019 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Declarative Hooks for Cloud SCC surface arguments."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import json
import re

from apitools.base.py import encoding
from googlecloudsdk.api_lib.scc import securitycenter_client as sc_client
from googlecloudsdk.command_lib.scc.errors import InvalidSCCInputError
from googlecloudsdk.command_lib.util.apis import yaml_data
from googlecloudsdk.command_lib.util.args import resource_args
from googlecloudsdk.command_lib.util.concepts import concept_parsers
from googlecloudsdk.core import exceptions as core_exceptions
from googlecloudsdk.core import properties
from googlecloudsdk.core import yaml


class InvalidCustomConfigFileError(core_exceptions.Error):
  """Error if a custom config file is improperly formatted."""


class InvalidTestDataFileError(core_exceptions.Error):
  """Error if a test data file is improperly formatted."""


class InvalidResourceFileError(core_exceptions.Error):
  """Error if a resource file is improperly formatted."""


def AppendOrgArg():
  """Add Organization as a positional resource."""
  org_spec_data = yaml_data.ResourceYAMLData.FromPath("scc.organization")
  arg_specs = [
      resource_args.GetResourcePresentationSpec(
          verb="to be used for the SCC (Security Command Center) command",
          name="organization",
          required=True,
          prefixes=False,
          positional=True,
          resource_data=org_spec_data.GetData()),
  ]
  return [concept_parsers.ConceptParser(arg_specs, [])]


def AppendParentArg():
  """Add Parent as a positional resource."""
  parent_spec_data = yaml_data.ResourceYAMLData.FromPath("scc.parent")
  arg_specs = [
      resource_args.GetResourcePresentationSpec(
          verb="to be used for the `gcloud scc` command",
          name="parent",
          help_text="{name} organization, folder, or project in the Google Cloud resource hierarchy {verb}. Specify the argument as either [RESOURCE_TYPE/RESOURCE_ID] or [RESOURCE_ID], as shown in the preceding examples.",
          required=True,
          prefixes=False,
          positional=True,
          resource_data=parent_spec_data.GetData()),
  ]
  return [concept_parsers.ConceptParser(arg_specs, [])]


def SourcePropertiesHook(source_properties_dict):
  """Hook to capture "key1=val1,key2=val2" as SourceProperties object."""
  messages = sc_client.GetMessages()
  return encoding.DictToMessage(source_properties_dict,
                                messages.Finding.SourcePropertiesValue)


def SecurityMarksHook(parsed_dict):
  """Hook to capture "key1=val1,key2=val2" as SecurityMarks object."""
  messages = sc_client.GetMessages()
  security_marks = messages.SecurityMarks()
  security_marks.marks = encoding.DictToMessage(
      parsed_dict, messages.SecurityMarks.MarksValue)
  return security_marks


def GetOrganization(args):
  """Prepend organizations/ to org if necessary."""
  resource_pattern = re.compile("organizations/[0-9]+")
  id_pattern = re.compile("[0-9]+")
  if not args.organization:
    organization = properties.VALUES.scc.organization.Get()
  else:
    organization = args.organization
  if organization is None:
    raise InvalidSCCInputError("Could not find Organization argument. Please "
                               "provide the organization argument.")
  if (not resource_pattern.match(organization) and
      not id_pattern.match(organization)):
    raise InvalidSCCInputError(
        "Organization must match either organizations/[0-9]+ or [0-9]+.")
  if resource_pattern.match(organization):
    return organization
  return "organizations/" + organization


def GetDefaultOrganization():
  """Prepend organizations/ to org if necessary."""
  resource_pattern = re.compile("organizations/[0-9]+")
  id_pattern = re.compile("[0-9]+")
  organization = properties.VALUES.scc.organization.Get()
  if (not resource_pattern.match(organization) and
      not id_pattern.match(organization)):
    raise InvalidSCCInputError(
        "Organization must match either organizations/[0-9]+ or [0-9]+.")
  if resource_pattern.match(organization):
    return organization
  return "organizations/" + organization


def GetDefaultParent():
  """Converts user input to one of: organization, project, or folder."""
  organization_resource_pattern = re.compile("organizations/[0-9]+$")
  id_pattern = re.compile("[0-9]+")
  parent = properties.VALUES.scc.parent.Get()
  if id_pattern.match(parent):
    # Prepend organizations/ if only number value is provided.
    parent = "organizations/" + parent
  if not (organization_resource_pattern.match(parent) or
          parent.startswith("projects/") or parent.startswith("folders/")):
    raise InvalidSCCInputError(
        """Parent must match either [0-9]+, organizations/[0-9]+, projects/.*
      or folders/.*.""")
  return parent


def CleanUpUserInput(mask):
  """Removes spaces from a field mask provided by user."""
  return mask.replace(" ", "")


def GetParentFromResourceName(resource_name):
  resource_pattern = re.compile("(organizations|projects|folders)/.*")
  if not resource_pattern.match(resource_name):
    raise InvalidSCCInputError(
        "When providing a full resource path, it must also include the pattern "
        "the organization, project, or folder prefix.")
  list_organization_components = resource_name.split("/")
  return list_organization_components[0] + "/" + list_organization_components[1]


def GetSourceParentFromResourceName(resource_name):
  resource_pattern = re.compile(
      "(organizations|projects|folders)/.*/sources/[0-9]+")
  if not resource_pattern.match(resource_name):
    raise InvalidSCCInputError(
        "When providing a full resource path, it must also include "
        "the organization, project, or folder prefix.")
  list_source_components = resource_name.split("/")
  return (GetParentFromResourceName(resource_name) + "/" +
          list_source_components[2] + "/" + list_source_components[3])


def ProcessCustomEtdConfigFile(file_contents):
  """Processes the configuration file for the ETD custom module."""
  messages = sc_client.GetMessages()
  try:
    config = json.loads(file_contents)
    return encoding.DictToMessage(
        config, messages.EventThreatDetectionCustomModule.ConfigValue
    )
  except json.JSONDecodeError as e:
    raise InvalidCustomConfigFileError(
        "Error parsing custom config file [{}]".format(e)
    )


def ProcessCustomConfigFile(file_contents):
  """Process the custom configuration file for the custom module."""
  messages = sc_client.GetMessages()
  try:
    config_dict = yaml.load(file_contents)
    return encoding.DictToMessage(
        config_dict, messages.GoogleCloudSecuritycenterV1CustomConfig
    )
  except yaml.YAMLParseError as ype:
    raise InvalidCustomConfigFileError(
        "Error parsing custom config file [{}]".format(ype))


def ExtractTestData(test_data_input):
  """Extract test data into list structure, accept both list and dict."""
  if isinstance(test_data_input, list):
    return test_data_input
  elif isinstance(test_data_input, dict):
    if "testData" in test_data_input:
      return test_data_input["testData"]
    else:
      return None
  else:
    if not test_data_input:
      raise InvalidTestDataFileError(
          "Error parsing test data file: no data records defined in file"
      )


def ProcessTestResourceDataFile(file_contents):
  """Process the test resource data file for the custom module to test against."""
  messages = sc_client.GetMessages()
  try:
    test_data = ExtractTestData(yaml.load(file_contents))
    test_data_messages = []
    for field in test_data:
      test_data_messages.append(
          encoding.DictToMessage(field, messages.TestData)
      )

    return test_data_messages
  except yaml.YAMLParseError as ype:
    raise InvalidTestDataFileError(
        "Error parsing test data file [{}]".format(ype)
    )


def ProcessSimulatedResourceFile(file_contents):
  """Process the simulate resource data file for the custom module to validate against."""
  messages = sc_client.GetMessages()
  try:
    resource_dict = yaml.load(file_contents)

    return encoding.DictToMessage(resource_dict, messages.SimulatedResource)
  except yaml.YAMLParseError as ype:
    raise InvalidResourceFileError(
        "Error parsing resource file [{}]".format(ype)
    )

def ProcessTFPlanFile(file_contents):
  """Process the custom configuration file for the custom module."""
  try:
    config = json.loads(file_contents)
    return json.dumps(config).encode("utf-8")
  except json.JSONDecodeError as e:
    raise InvalidCustomConfigFileError(
        "Error parsing terraform plan file [{}]".format(e)
    )