HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/scc/iac_remediation/terraform.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Library for fetching TF Files."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections
import os
import re

from typing import Dict, List
from googlecloudsdk.command_lib.code import run_subprocess
from googlecloudsdk.core.util import files
import hcl2


def get_tfstate_information_per_member(
    iam_bindings: Dict[str, Dict[str, List[str]]],
    tfstate_json_list: List[Dict[str, str]],
    resource_name: str,
) -> Dict[str, List[Dict[str, str]]]:
  """Gets the TFState information for the given IAM bindings.

  Args:
    iam_bindings: IAM bindings for the resource.
    tfstate_json_list: List of TFState files.
    resource_name: Resource name for which the finding was generated.

  Returns:
    List of TFState information for the given IAM bindings.
  """
  tfstate_information: Dict[str, List[Dict[str, str]]] = {}
  for member, binding in iam_bindings.items():
    for tfstate_json in tfstate_json_list:
      if "ADD" in binding:
        for role in binding["ADD"]:
          resource_data = fetch_relevant_modules(
              tfstate_json, resource_name, role, member
          )
          if resource_data:
            if member not in tfstate_information:
              tfstate_information[member] = []
            tfstate_information[member].append(resource_data)
      if "REMOVE" in binding:
        for role in binding["REMOVE"]:
          resource_data = fetch_relevant_modules(
              tfstate_json, resource_name, role, member
          )
          if resource_data:
            if member not in tfstate_information:
              tfstate_information[member] = []
            tfstate_information[member].append(resource_data)
    return tfstate_information


def read_original_files_content(
    tf_files_paths: List[str],
)-> Dict[str, str]:
  """Reads the original files content.

  Args:
    tf_files_paths: List of TF files paths.

  Returns:
    Dict of file path and file content.
  """
  original_tf_files = dict[str, str]()
  for file_path in tf_files_paths:
    original_file_content = files.ReadFileContents(file_path)
    original_tf_files[file_path] = original_file_content
  return original_tf_files


def update_tf_files(
    response_dict: Dict[str, str],
):
  """Updates the TF files with the response dict.

  Args:
    response_dict: Response dict containing the updated TF files.

  """
  for file_path, file_data in response_dict.items():
    _ = files.WriteFileContents(file_path, file_data)


def validate_tf_files(
    response_dict: Dict[str, str]
) -> (bool, Dict[str, str]):
  """Validates the TFState information for the given IAM bindings.

  Args:
    response_dict: response dict containing the updated TF files.

  Returns:
    True if the response is valid, False otherwise.
    updated_response_dict: Updated response dict containing the original TF
    files.
  """
  original_tf_files = dict[str, str]()
  for file_path, file_data in response_dict.items():
    original_file_content = files.ReadFileContents(file_path)
    original_tf_files[file_path] = original_file_content
    try:
      _ = files.WriteFileContents(file_path, file_data)
      cmd = ["terraform", "fmt", "-write=true", file_path]
      run_subprocess.GetOutputLines(cmd, timeout_sec=25, show_stderr=False)
      response_dict[file_path] = files.ReadFileContents(file_path)
    except Exception as e:  # pylint: disable=broad-exception-caught
      update_tf_files(original_tf_files)
      return False, e
  cmd = ["terraform", "validate"]
  try:
    validate_output = run_subprocess.GetOutputLines(
        cmd, timeout_sec=25, show_stderr=False
    )
  except Exception as e:  # pylint: disable=broad-exception-caught
    update_tf_files(original_tf_files)
    return False, e
  update_tf_files(original_tf_files)
  if  re.search("Success", validate_output[0], re.IGNORECASE):
    return True, response_dict
  return False, None


def fetch_tfstate_json_from_dir(dir_path: str) -> str:
  """Fetches the TFState json for the given directory.

  Args:
    dir_path: The path of the directory to fetch the TFState files from.

  Returns:
    The json of the TFState file or None if there is an error.
  """
  try:
    os.chdir(dir_path)
    cmd = ["terraform", "init"]
    run_subprocess.GetOutputLines(cmd, timeout_sec=10)
  except Exception as _:  # pylint: disable=broad-exception-caught
    return ""
  try:
    cmd = ["terraform", "show", "-json"]
    tfstate_json = run_subprocess.GetOutputLines(cmd, timeout_sec=10)
  except Exception as _:  # pylint: disable=broad-exception-caught
    return ""
  return tfstate_json


def fetch_tfstate_json_from_file(file_path: str) -> str:
  """Fetches the TFState json for the given tfstate file path.

  Args:
    file_path: The path of the file to fetch the TFState json from.

  Returns:
    The json of the TFState files.
  """
  file = files.ReadFileContents(file_path)
  tfstate_json = hcl2.loads(file)
  return tfstate_json


def fetch_relevant_modules(
    tfstate_json: Dict[str, str],
    resource_name: str, role_name: str, member_name: str,
) -> str:
  """Fetches the relevant modules from the given TFState files."""
  resource_data = ""
  if (
      "values" not in tfstate_json
      or "root_module" not in tfstate_json["values"]
      or "resources" not in tfstate_json["values"]["root_module"]
  ):
    return resource_data
  for resource in tfstate_json["values"]["root_module"]["resources"]:
    if (
        "values" in resource
        and "member" in resource["values"]
        and "role" in resource["values"]
        and "project_id" in resource["values"]
        and resource["values"]["member"] == member_name
        and resource["values"]["role"] == role_name
        and resource["values"]["project_id"] == resource_name
    ):
      resource_data = resource
      break
  return resource_data


def find_tf_files(root_dir: str) -> List[str]:
  """Finds all the TF files in the given directory.

  Args:
    root_dir: The path of the directory to find the TF files in.

  Returns:
    A list of the TF files paths in the given directory.
  """
  tf_files = []
  queue = collections.deque([root_dir])
  while queue:
    current_dir = queue.popleft()
    for item in os.listdir(current_dir):
      item_path = os.path.join(current_dir, item)
      if os.path.isdir(item_path):
        if not item.startswith("."):
          queue.append(item_path)
      elif os.path.isfile(item_path) and (
          item_path.endswith(".tf") or item_path.endswith(".tfvars")
          and not item_path.startswith(".")
      ):
        tf_files.append(item_path)
  return tf_files


def fetch_tfstate_list(
    tfstate_file_paths: List[str],
    root_dir: str,
) -> List[Dict[str, str]]:
  """Fetches the TFState list for the given TFState file paths.

  Args:
    tfstate_file_paths: List of TFState file paths.
    root_dir: The path of the root directory.

  Returns:
    List of TFState json.
  """
  tfstate_json_list = []
  if tfstate_file_paths:
    for tfstate_file_path in tfstate_file_paths:
      tfstate_json_list.append(
          fetch_tfstate_json_from_file(tfstate_file_path)
      )
  else:
    return find_tfstate_jsons(root_dir)
  return tfstate_json_list


def find_tfstate_jsons(
    dir_path: str
) -> List[Dict[str, str]]:
  """Finds the TFState jsons in the given directory.

  Args:
    dir_path: The path of the directory to find the TFState jsons in.

  Returns:
    List of TFState jsons.
  """
  tfstate_jsons = []
  queue = collections.deque([dir_path])
  while queue:
    current_dir = queue.popleft()
    tfstate_jsons.append(fetch_tfstate_json_from_dir(current_dir))
    for item in os.listdir(current_dir):
      if not item.startswith("."):
        item_path = os.path.join(current_dir, item)
        if os.path.isdir(item_path):
          queue.append(item_path)
  return tfstate_jsons