HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/surface/terraform/vet.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Validate that a terraform plan complies with policies."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import os.path

from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.util.anthos import binary_operations
from googlecloudsdk.core import log
from googlecloudsdk.core import metrics
from googlecloudsdk.core import properties
from googlecloudsdk.core.console import progress_tracker
from googlecloudsdk.core.credentials.store import GetFreshAccessToken
from googlecloudsdk.core.util import encoding
from googlecloudsdk.core.util import files

MISSING_BINARY = ('Could not locate terraform-tools executable [{binary}]. '
                  'Please ensure gcloud terraform-tools component is '
                  'properly installed. '
                  'See https://cloud.google.com/sdk/docs/components for '
                  'more details.')


class TerraformToolsTfplanToCaiOperation(
    binary_operations.StreamingBinaryBackedOperation):
  """Streaming operation for Terraform Tools tfplan-to-cai command."""
  custom_errors = {}

  def __init__(self, **kwargs):
    custom_errors = {
        'MISSING_EXEC': MISSING_BINARY.format(binary='terraform-tools'),
    }
    super(TerraformToolsTfplanToCaiOperation, self).__init__(
        binary='terraform-tools',
        check_hidden=True,
        install_if_missing=True,
        custom_errors=custom_errors,
        structured_output=True,
        **kwargs)

  def _ParseArgsForCommand(self, command, terraform_plan_json, project, region,
                           zone, verbosity, output_path, **kwargs):
    args = [
        command,
        terraform_plan_json,
        '--output-path',
        output_path,
        '--verbosity',
        verbosity,
        '--user-agent',
        metrics.GetUserAgent(),
    ]
    if project:
      args += ['--project', project]
    if region:
      args += ['--region', region]
    if zone:
      args += ['--zone', zone]
    return args


class TerraformToolsValidateOperation(binary_operations.BinaryBackedOperation):
  """operation for Terraform Tools validate-cai command."""
  custom_errors = {}

  def __init__(self, **kwargs):
    custom_errors = {
        'MISSING_EXEC': MISSING_BINARY.format(binary='terraform-tools'),
    }
    super(TerraformToolsValidateOperation, self).__init__(
        binary='terraform-tools',
        check_hidden=True,
        # Install will be handled by the conversion operation
        install_if_missing=False,
        custom_errors=custom_errors,
        **kwargs)

  def _ParseArgsForCommand(self, command, input_file, policy_library, verbosity,
                           **kwargs):
    args = [
        command,
        input_file,
        '--verbosity',
        verbosity,
        '--policy-library',
        os.path.expanduser(policy_library),
    ]
    return args


@base.ReleaseTracks(base.ReleaseTrack.ALPHA, base.ReleaseTrack.BETA)
class Vet(base.Command):
  """Validate that a terraform plan complies with policies."""

  detailed_help = {
      'EXAMPLES':
          """
        To validate that a terraform plan complies with a policy library
        at `/my/policy/library`:

        $ {command} tfplan.json --policy-library=/my/policy/library
        """,
  }

  @staticmethod
  def Args(parser):
    parser.add_argument(
        'terraform_plan_json',
        help=(
            'File which contains a JSON export of a terraform plan. This file '
            'will be validated against the given policy library.'),
    )
    parser.add_argument(
        '--policy-library',
        required=True,
        help='Directory which contains a policy library',
    )
    parser.add_argument(
        '--zone',
        required=False,
        help='Default zone to use for resources that do not have one set',
    )
    parser.add_argument(
        '--region',
        required=False,
        help='Default region to use for resources that do not have one set',
    )

  def Run(self, args):
    tfplan_to_cai_operation = TerraformToolsTfplanToCaiOperation()
    validate_cai_operation = TerraformToolsValidateOperation()
    validate_tfplan_operation = TerraformToolsValidateOperation()

    env_vars = {
        'GOOGLE_OAUTH_ACCESS_TOKEN':
            GetFreshAccessToken(account=properties.VALUES.core.account.Get()),
        'USE_STRUCTURED_LOGGING':
            'true',
    }

    proxy_env_names = [
        'HTTP_PROXY', 'http_proxy', 'HTTPS_PROXY', 'https_proxy', 'NO_PROXY',
        'no_proxy'
    ]

    # env names and orders are from
    # https://registry.terraform.io/providers/hashicorp/google/latest/docs/guides/provider_reference#full-reference
    project_env_names = [
        'GOOGLE_PROJECT',
        'GOOGLE_CLOUD_PROJECT',
        'GCLOUD_PROJECT',
    ]

    zone_env_names = [
        'GOOGLE_ZONE',
        'GCLOUD_ZONE',
        'CLOUDSDK_COMPUTE_ZONE',
    ]

    region_env_names = [
        'GOOGLE_REGION',
        'GCLOUD_REGION',
        'CLOUDSDK_COMPUTE_REGION',
    ]

    for env_key, env_val in os.environ.items():
      if env_key in proxy_env_names:
        env_vars[env_key] = env_val

    with files.TemporaryDirectory() as tempdir:
      cai_assets = os.path.join(tempdir, 'cai_assets.json')

      # project flag and CLOUDSDK_CORE_PROJECT env are linked with core property
      project = properties.VALUES.core.project.Get()
      if project:
        log.debug('Setting project to {} from properties'.format(project))
      else:
        for env_key in project_env_names:
          project = encoding.GetEncodedValue(os.environ, env_key)
          if project:
            log.debug('Setting project to {} from env {}'.format(
                project, env_key))
            break

      region = ''
      if args.region:
        region = args.region
        log.debug('Setting region to {} from args'.format(region))
      else:
        for env_key in region_env_names:
          region = encoding.GetEncodedValue(os.environ, env_key)
          if region:
            log.debug('Setting region to {} from env {}'.format(
                region, env_key))
            break

      zone = ''
      if args.zone:
        zone = args.zone
        log.debug('Setting zone to {} from args'.format(zone))
      else:
        for env_key in zone_env_names:
          zone = encoding.GetEncodedValue(os.environ, env_key)
          if zone:
            log.debug('Setting zone to {} from env {}'.format(zone, env_key))
            break

      response = tfplan_to_cai_operation(
          command='tfplan-to-cai',
          project=project,
          region=region,
          zone=zone,
          terraform_plan_json=args.terraform_plan_json,
          verbosity=args.verbosity,
          output_path=cai_assets,
          env=env_vars)
      self.exit_code = response.exit_code
      if self.exit_code > 0:
        # The streaming binary backed operation handles its own writing to
        # stdout and stderr, so there's nothing left to do here.
        return None

      with progress_tracker.ProgressTracker(
          message='Validating resources',
          aborted_message='Aborted validation.'):
        cai_response = validate_cai_operation(
            command='validate-cai',
            policy_library=args.policy_library,
            input_file=cai_assets,
            verbosity=args.verbosity,
            env=env_vars)
        tfplan_response = validate_tfplan_operation(
            command='validate-tfplan',
            policy_library=args.policy_library,
            input_file=args.terraform_plan_json,
            verbosity=args.verbosity,
            env=env_vars)

    # exit code 2 from a validate_* command indicates violations; we need to
    # pass that through to users so they can detect this case. However, if
    # either command errors out (exit code 1) return that instead.
    if cai_response.exit_code == 1 or tfplan_response.exit_code == 1:
      self.exit_code = 1
    elif cai_response.exit_code == 2 or tfplan_response.exit_code == 2:
      self.exit_code = 2

    # Output from validate commands uses "structured output", same as the
    # streaming output from conversion. The final output should be a combined
    # list of violations.
    violations = []

    for policy_type, response in (('CAI', cai_response), ('Terraform',
                                                          tfplan_response)):
      if response.stdout:
        try:
          msg = binary_operations.ReadStructuredOutput(
              response.stdout, as_json=True)
        except binary_operations.StructuredOutputError:
          log.warning('Could not parse {} policy validation output.'.format(
              policy_type))
        else:
          violations += msg.resource_body
      if response.stderr:
        handler = binary_operations.DefaultStreamStructuredErrHandler(None)
        for line in response.stderr.split('\n'):
          handler(line)

    return violations