HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/surface/scc/iac_remediation/create.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command for remediating a Cloud Security Command Center Finding."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import json
from typing import Any

from googlecloudsdk.api_lib.scc.iac_remediation import findings
from googlecloudsdk.api_lib.scc.iac_remediation import git
from googlecloudsdk.api_lib.scc.iac_remediation import llm
from googlecloudsdk.api_lib.scc.iac_remediation import prompt
from googlecloudsdk.api_lib.scc.iac_remediation import pull_requests
from googlecloudsdk.api_lib.scc.iac_remediation import terraform
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.scc.iac_remediation import errors
from googlecloudsdk.command_lib.scc.iac_remediation import flags
from googlecloudsdk.core import log


@base.Hidden
@base.ReleaseTracks(base.ReleaseTrack.ALPHA, base.ReleaseTrack.GA)
@base.UniverseCompatible
class Create(base.CreateCommand):
  """Remediates a Security Command Center finding."""

  detailed_help = {
      "DESCRIPTION": "Remediates a Security Command Center finding.",
      "EXAMPLES": """
          Sample usage:

          $ {{command}} scc iac-remediation create --finding-org-id=123456789
          --finding-name=projects/123456789/sources/123456789/locations/global/findings/123456789
          --tfstate-file-paths=/path/to/file1.tfstate,/path/to/file2.tfstate --project-id=my-proj
          --git-config-path=/path/to/config.yaml""",
  }

  @staticmethod
  def Args(parser):
    flags.FINDING_ORG_ID_FLAG.AddToParser(parser)
    flags.FINDING_NAME_FLAG.AddToParser(parser)
    flags.LLM_PROJ_ID_FLAG.AddToParser(parser)
    flags.TFSTATE_FILE_PATHS_LIST_FLAG.AddToParser(parser)
    flags.GIT_CONFIG_FILE_PATH_FLAG.AddToParser(parser)

  def Run(self, args: Any) -> None:
    """Remediates a Security Command Center finding.

    Args:
      args: Arguments for the command.
    """
    git_flag, repo_root_dir = git.is_git_repo()
    if not git_flag:  # Throw error if command is invoked from a non-git repo
      raise errors.GitRepoNotFoundError()
    git.validate_git_config(args.git_config_path)
    git_config_data = args.git_config_path
    log.Print(git_config_data)
    log.Print("repo_root_dir: ", repo_root_dir)
    resp = findings.MakeApiCall(args.finding_org_id, args.finding_name)
    json_resp = json.loads(resp)
    iam_bindings = findings.FetchIAMBinding(json_resp)
    resource_name = findings.FetchResourceName(json_resp)
    tfstate_json_list = terraform.fetch_tfstate_list(
        args.tfstate_file_paths, repo_root_dir
    )
    if not tfstate_json_list:
      log.Print("No TFState files found.")
      return
    tfstate_information = terraform.get_tfstate_information_per_member(
        iam_bindings, tfstate_json_list, resource_name
    )
    if not tfstate_information:
      for tfstate_json in tfstate_json_list:
        if "google_project_iam_policy" in tfstate_json:
          tfstate_information = "google_project_iam_policy"
    tf_files = terraform.find_tf_files(repo_root_dir)
    original_files_content = terraform.read_original_files_content(tf_files)
    for member, role_data in iam_bindings.items():
      tfstate_data = ""
      if tfstate_information and member in tfstate_information:
        tfstate_data = tfstate_information[member]
      input_prompt = prompt.fetch_input_prompt(
          tfstate_data,
          role_data,
          resource_name,
          tf_files,
          member,
      )
      response = llm.MakeLLMCall(input_prompt, args.project_id)
      response_dict = prompt.llm_response_parser(response)
      check, response = terraform.validate_tf_files(response_dict)
      if not check:
        terraform.update_tf_files(original_files_content)
        raise errors.InvalidLLMResponseError(response)
      else:
        log.Print(response)
        git.push_commit(
            response_dict,
            pull_requests.CreateCommitMessage(resp, member),
            git_config_data["remote"],
            git_config_data["branch-prefix"]
            + findings.ParseName(args.finding_name),
        )
        terraform.update_tf_files(response_dict)
    git.raise_pr(
        pull_requests.CreatePRMessage(resp),
        pull_requests.CreatePRMessage(resp),
        git_config_data["remote"],
        git_config_data["branch-prefix"]
        + findings.ParseName(args.finding_name),
        git_config_data["main-branch-name"],
    )
    terraform.update_tf_files(original_files_content)