HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/surface/scc/remediation_intents/auto_remediate.py
# -*- coding: utf-8 -*- #
# Copyright 2025 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command for semi-automatic remediation of SCC findings."""

import copy
import uuid

from googlecloudsdk.api_lib.scc.remediation_intents import const
from googlecloudsdk.api_lib.scc.remediation_intents import converters
from googlecloudsdk.api_lib.scc.remediation_intents import extended_service
from googlecloudsdk.api_lib.scc.remediation_intents import git
from googlecloudsdk.api_lib.scc.remediation_intents import sps_api
from googlecloudsdk.api_lib.scc.remediation_intents import terraform
from googlecloudsdk.api_lib.scc.remediation_intents import validators
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.scc.remediation_intents import flags
from googlecloudsdk.core import log


@base.ReleaseTracks(base.ReleaseTrack.ALPHA)
@base.UniverseCompatible


class AutoRemediate(base.SilentCommand, base.CacheCommand):
  """Command for semi-automatic remediation of SCC findings."""
  detailed_help = {
      "DESCRIPTION": """
        Orchestrates the semi-automatic remediation process for SCC findings
        by calling the Remediation Intent APIs.
        """,

      "EXAMPLES": """
          Sample usage:
          Remediate a SCC finding for the organization 1234567890, in the
          terraform repository located at ./terraform-repo.
          $ {{command}} scc remediation-intents auto-remediate \\
            --org-id=1234567890 \\
            --root-dir-path=./terraform-repo \\
            --git-config-path=./git-config.yaml""",
  }

  @staticmethod
  def Args(parser):
    flags.ROOT_DIR_PATH_FLAG.AddToParser(parser)
    flags.ROOT_DIR_PATH_FLAG.SetDefault(parser, ".")
    flags.ORG_ID_FLAG.AddToParser(parser)
    flags.GIT_CONFIG_FILE_PATH_FLAG.AddToParser(parser)

  def Run(self, args) -> None:
    """The main function which is called when the user runs this command.

    Args:
      args: an argparse namespace. All the arguments that were provided to this
        command invocation.
    """
    # Set up the variables.
    org_id = args.org_id
    git_config_data = args.git_config_path
    root_dir_path = args.root_dir_path
    # The extended service client to interact with the SPS service.
    client = extended_service.ExtendedSPSClient(org_id, base.ReleaseTrack.ALPHA)
    # The converter instance to handle conversion between different data types.
    converter = converters.RemediationIntentConverter(base.ReleaseTrack.ALPHA)
    messages = sps_api.GetMessagesModule(base.ReleaseTrack.ALPHA)

    # Validate the input arguments.
    validators.validate_git_config(git_config_data)
    validators.validate_relative_dir_path(root_dir_path)
    # Create a SCC finding Remediation Intent.

    # Fetch an enqueued remediation intent which needs to be remediated.
    intent_data = client.fetch_enqueued_remediation_intent()
    if (  # Create a new intent if no enqueued intent is found.
        intent_data is None
    ):
      client.create_semi_autonomous_remediation_intent()
      intent_data = client.fetch_enqueued_remediation_intent()
      if intent_data is None:
        # Exit gracefully if still no intent is found.
        log.Print("No remediation intent found to be remediated, exitting...")
        return
    intent_name = intent_data.name

    tf_files = terraform.fetch_tf_files(root_dir_path)
    if not tf_files:  # Exit gracefully if no TF files are found.
      log.Print("No TF files found, exitting...")
      return
    # Parse the TFState file for the given finding data.
    tfstate_data = terraform.parse_tf_file(
        root_dir_path, intent_data.findingData
    )

    log.Print("Remediation started....")
    # Update the state to REMEDIATION_IN_PROGRESS and start the remediation.
    intent_updated = copy.deepcopy(intent_data)
    intent_updated.state = (  # Mark the state as REMEDIATION_IN_PROGRESS.
        messages.RemediationIntent.StateValueValuesEnum.REMEDIATION_IN_PROGRESS
    )
    intent_updated.remediationInput = messages.RemediationInput(
        tfData=messages.TfData(
            fileData=converter.DictFilesToMessage(tf_files),
            tfStateInfo=tfstate_data,
        )
    )
    update_mask = "state,remediation_input"
    intent_updated = client.update_remediation_intent(  # Call the Update API.
        intent_name, update_mask, intent_updated
    )
    if (
        intent_updated.state
        == messages.RemediationIntent.StateValueValuesEnum.REMEDIATION_FAILED
    ):
      log.Print("Remediation failed, exitting...")
      return

    # Retry the remediation process for certain number of times.
    is_remediated = False
    retry_count = 0
    while not is_remediated and retry_count < const.REMEDIATION_RETRY_COUNT:
      log.Print("Remediation retry count: ", retry_count)
      updated_tf_files = converter.MessageFilesToDict(
          intent_updated.remediatedOutput.outputData[0].tfData.fileData
      )
      error_msg = terraform.validate_tf_files(updated_tf_files)
      if error_msg is None:  # Remediation is successful.
        is_remediated = True
        break
      # Send the error details to the server and retry the remediation.
      intent_updated.remediationInput.errorDetails = messages.ErrorDetails(
          reason=error_msg
      )
      update_mask = "remediation_input.error_details"
      intent_updated = client.update_remediation_intent(
          intent_name, update_mask, intent_updated
      )
      if (
          intent_updated.state
          == messages.RemediationIntent.StateValueValuesEnum.REMEDIATION_FAILED
      ):
        log.Print("Remediation failed, exitting...")
        return
      retry_count += 1  # Upate the retry count.
      log.Print("Remediation failed, retrying...")

    if not is_remediated:   # Mark the state as REMEDIATION_FAILED and exit.
      log.Print("Remediation failed: Max retry limit reached.")
      intent_updated.state = (
          messages.RemediationIntent.StateValueValuesEnum.REMEDIATION_FAILED
      )
      update_mask = "state"
      _ = client.update_remediation_intent(  # Call the Update API.
          intent_name, update_mask, intent_updated
      )
      return

    log.Print("Remediation completed successfully.")
    intent_updated.state = (  # Mark the state as REMEDIATION_SUCCESS.
        messages.RemediationIntent.StateValueValuesEnum.REMEDIATION_SUCCESS
    )
    intent_updated.remediationInput.errorDetails = None
    update_mask = "state,remediation_input.error_details"
    intent_updated = client.update_remediation_intent(  # Call the Update API.
        intent_name, update_mask, intent_updated
    )

    # Generate the PR for the remediated output.
    log.Print("Starting PR generation process...")
    updated_tf_files = converter.MessageFilesToDict(
        intent_updated.remediatedOutput.outputData[0].tfData.fileData
    )
    git_config_data["branch-prefix"] += str(uuid.uuid4())

    git.push_commit(
        updated_tf_files,
        const.COMMIT_MSG.format(
            project_id=intent_updated.findingData.findingName.split("/")[1],
            finding_id=intent_updated.findingData.findingName.split("/")[-1],
            category=intent_updated.findingData.category,
        ),
        git_config_data["remote"],
        git_config_data["branch-prefix"],
    )
    log.Print("Commit pushed successfully.")
    # Add the remediation explanation to the PR description.
    pr_status, pr_msg = git.create_pr(
        const.PR_TITLE.format(
            project_id=intent_updated.findingData.findingName.split("/")[1],
            finding_id=intent_updated.findingData.findingName.split("/")[-1],
            category=intent_updated.findingData.category,
        ),
        const.PR_DESC.format(
            remediation_explanation=intent_updated.remediatedOutput.remediationExplanation.replace(
                "`", r"\`"
            ),
            file_modifiers="\n".join(
                f"{fp}: {ea}"
                for fp, ea in (git.get_file_modifiers(updated_tf_files)).items()
            ),
            file_owners="\n".join(
                f"{fp}: {ea}"
                for fp, ea in (
                    git.get_file_modifiers(updated_tf_files, first=True)
                ).items()
            ),
        ),
        git_config_data["remote"],
        git_config_data["branch-prefix"],
        git_config_data["main-branch-name"],
        git_config_data["reviewers"],
    )
    # Update the state and error details if the PR creation fails, and exit.
    if not pr_status:
      log.Print("PR creation failed, exitting...")
      intent_updated.state = (
          messages.RemediationIntent.StateValueValuesEnum.PR_GENERATION_FAILED
      )
      intent_updated.errorDetails = messages.ErrorDetails(reason=pr_msg)
      update_mask = "state,error_details"
      _ = client.update_remediation_intent(
          intent_name, update_mask, intent_updated
      )
      return

    # Finally Update the state and PR details if the PR creation is successful.
    log.Print("PR created successfully.")
    intent_updated.state = (
        messages.RemediationIntent.StateValueValuesEnum.PR_GENERATION_SUCCESS
    )
    intent_updated.remediationArtifacts = messages.RemediationArtifacts(
        prData=messages.PullRequest(
            url=pr_msg,
            modifiedFileOwners=list(
                (git.get_file_modifiers(updated_tf_files, first=True)).values()
            ),
            modifiedFilePaths=list(
                (git.get_file_modifiers(updated_tf_files, first=True)).keys()
            ),
        )
    )
    update_mask = "state,remediation_artifacts"
    _ = client.update_remediation_intent(
        intent_name, update_mask, intent_updated
    )