HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/infra_manager/deploy_util.py
# -*- coding: utf-8 -*- #
# Copyright 2023 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Support library for managing deployments."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import os
import sys
import uuid

from apitools.base.py import encoding
from apitools.base.py import exceptions as apitools_exceptions
from apitools.base.py import extra_types
from googlecloudsdk.api_lib.infra_manager import configmanager_util
from googlecloudsdk.api_lib.storage import storage_api
from googlecloudsdk.api_lib.storage import storage_util
from googlecloudsdk.calliope import exceptions as c_exceptions
from googlecloudsdk.command_lib.infra_manager import deterministic_snapshot
from googlecloudsdk.command_lib.infra_manager import errors
from googlecloudsdk.command_lib.infra_manager import staging_bucket_util
from googlecloudsdk.core import log
from googlecloudsdk.core import requests
from googlecloudsdk.core import resources
from googlecloudsdk.core.resource import resource_transform
from googlecloudsdk.core.util import files
from googlecloudsdk.core.util import times
import six


if sys.version_info >= (3, 6):
  # pylint: disable=g-import-not-at-top
  from googlecloudsdk.command_lib.infra_manager import tfvars_parser


def _UploadSourceDirToGCS(gcs_client, source, gcs_source_staging, ignore_file):
  """Uploads a local directory to GCS.

  Uploads one file at a time rather than tarballing/zipping for compatibility
  with the back-end.

  Args:
    gcs_client: a storage_api.StorageClient instance for interacting with GCS.
    source: string, a path to a local directory.
    gcs_source_staging: resources.Resource, the bucket to upload to. This must
      already exist.
    ignore_file: optional string, a path to a gcloudignore file.
  """

  source_snapshot = deterministic_snapshot.DeterministicSnapshot(
      source, ignore_file=ignore_file
  )

  size_str = resource_transform.TransformSize(source_snapshot.uncompressed_size)
  log.status.Print(
      'Uploading {num_files} file(s) totalling {size}.'.format(
          num_files=len(source_snapshot.files), size=size_str
      )
  )

  for file_metadata in source_snapshot.GetSortedFiles():
    full_local_path = os.path.join(file_metadata.root, file_metadata.path)

    target_obj_ref = 'gs://{0}/{1}/{2}'.format(
        gcs_source_staging.bucket, gcs_source_staging.object, file_metadata.path
    )
    target_obj_ref = resources.REGISTRY.Parse(
        target_obj_ref, collection='storage.objects'
    )

    gcs_client.CopyFileToGCS(full_local_path, target_obj_ref)


def _UploadSourceToGCS(
    source, stage_bucket, deployment_short_name, location, ignore_file
):
  """Uploads local content to GCS.

  This will ensure that the source and destination exist before triggering the
  upload.

  Args:
    source: string, a local path.
    stage_bucket: optional string. When not provided, the default staging bucket
      will be used (see GetDefaultStagingBucket). This string is of the format
      "gs://bucket-name/". An "im_source_staging" object will be created under
      this bucket, and any uploaded artifacts will be stored there.
    deployment_short_name: short name of the deployment.
    location: location of the deployment.
    ignore_file: string, a path to a gcloudignore file.

  Returns:
    A string in the format "gs://path/to/resulting/upload".

  Raises:
    RequiredArgumentException: if stage-bucket is owned by another project.
    BadFileException: if the source doesn't exist or isn't a directory.
  """
  gcs_client = storage_api.StorageClient()

  if stage_bucket is None:
    used_default_bucket_name = True
    gcs_source_staging_dir = staging_bucket_util.DefaultGCSStagingDir(
        deployment_short_name, location
    )
  else:
    used_default_bucket_name = False
    gcs_source_staging_dir = '{0}{1}/{2}/{3}'.format(
        stage_bucket,
        staging_bucket_util.STAGING_DIR,
        location,
        deployment_short_name,
    )

  # By calling REGISTRY.Parse on "gs://my-bucket/foo", the result's "bucket"
  # property will be "my-bucket" and the "object" property will be "foo".
  gcs_source_staging_dir_ref = resources.REGISTRY.Parse(
      gcs_source_staging_dir, collection='storage.objects'
  )

  # Make sure the bucket exists
  try:
    gcs_client.CreateBucketIfNotExists(
        gcs_source_staging_dir_ref.bucket,
        check_ownership=used_default_bucket_name,
    )
  except storage_api.BucketInWrongProjectError:
    raise c_exceptions.RequiredArgumentException(
        'stage-bucket',
        'A bucket with name {} already exists and is owned by '
        'another project. Specify a bucket using '
        '--stage-bucket.'.format(gcs_source_staging_dir_ref.bucket),
    )

  # This will look something like this:
  # "1615850562.234312-044e784992744951b0cd71c0b011edce"
  staged_object = '{stamp}-{uuid}'.format(
      stamp=times.GetTimeStampFromDateTime(times.Now()),
      uuid=uuid.uuid4().hex,
  )

  if gcs_source_staging_dir_ref.object:
    staged_object = gcs_source_staging_dir_ref.object + '/' + staged_object

  gcs_source_staging = resources.REGISTRY.Create(
      collection='storage.objects',
      bucket=gcs_source_staging_dir_ref.bucket,
      object=staged_object,
  )

  if not os.path.exists(source):
    raise c_exceptions.BadFileException(
        'could not find source [{}]'.format(source)
    )

  if not os.path.isdir(source):
    raise c_exceptions.BadFileException(
        'source is not a directory [{}]'.format(source)
    )

  _UploadSourceDirToGCS(gcs_client, source, gcs_source_staging, ignore_file)

  upload_bucket = 'gs://{0}/{1}'.format(
      gcs_source_staging.bucket, gcs_source_staging.object
  )

  return upload_bucket


def UpdateDeploymentDeleteRequestWithForce(unused_ref, unused_args, request):
  """UpdateDeploymentDeleteRequestWithForce adds force flag to delete request."""

  request.force = True
  return request


def DeleteCleanupStagedObjects(response, unused_args):
  """DeploymentDeleteCleanupStagedObjects deletes staging gcs objects created as part of deployment apply command."""

  # do not delete staged objects if delete results in error
  if response.error is not None:
    return
  if response.metadata is not None:
    md = encoding.MessageToPyValue(response.metadata)
    # entity could be deployment, previews
    entity_full_name = md['target']
    entity_id = entity_full_name.split('/')[5]
    location = entity_full_name.split('/')[3]
    staging_gcs_directory = staging_bucket_util.DefaultGCSStagingDir(
        entity_id, location
    )
    gcs_client = storage_api.StorageClient()
    staging_bucket_util.DeleteStagingGCSFolder(
        gcs_client, staging_gcs_directory
    )
  return response


def _CreateTFBlueprint(
    messages,
    deployment_short_name,
    preview_short_name,
    location,
    local_source,
    stage_bucket,
    ignore_file,
    gcs_source,
    git_source_repo,
    git_source_directory,
    git_source_ref,
    input_values,
):
  """Returns the TerraformBlueprint message.

  Args:
    messages: ModuleType, the messages module that lets us form Config API
      messages based on our protos.
    deployment_short_name: short name of the deployment.
    preview_short_name: short name of the preview.
    location: location of the deployment.
    local_source: Local storage path where config files are stored.
    stage_bucket: optional string. Destination for storing local config files
      specified by local source flag. e.g. "gs://bucket-name/".
    ignore_file: optional string, a path to a gcloudignore file.
    gcs_source:  URI of an object in Google Cloud Storage. e.g.
      `gs://{bucket}/{object}`
    git_source_repo: Repository URL.
    git_source_directory: Subdirectory inside the git repository.
    git_source_ref: Git branch or tag.
    input_values: Input variable values for the Terraform blueprint. It only
      accepts (key, value) pairs where value is a scalar value.

  Returns:
    A messages.TerraformBlueprint to use with deployment operation.
  """

  terraform_blueprint = messages.TerraformBlueprint(
      inputValues=input_values,
  )

  if gcs_source is not None:
    terraform_blueprint.gcsSource = gcs_source
  elif local_source is not None and deployment_short_name is not None:
    upload_bucket = _UploadSourceToGCS(
        local_source, stage_bucket, deployment_short_name, location, ignore_file
    )
    terraform_blueprint.gcsSource = upload_bucket
  elif local_source is not None and preview_short_name is not None:
    upload_bucket = _UploadSourceToGCS(
        local_source, stage_bucket, preview_short_name, location, ignore_file
    )
    terraform_blueprint.gcsSource = upload_bucket
  else:
    terraform_blueprint.gitSource = messages.GitSource(
        repo=git_source_repo,
        directory=git_source_directory,
        ref=git_source_ref,
    )

  return terraform_blueprint


def _CreateProviderConfig(
    messages,
    provider_source,
):
  """Returns the ProviderConfig message.

  Args:
    messages: ModuleType, the messages module that lets us form Config API
      messages based on our protos.
    provider_source: Input to control from where to fetch providers.

  Returns:
      A messages.ProviderConfig to use with deployment operation or None if
      provider_source is not set.
  """
  if provider_source is None:
    return None
  provider_config = messages.ProviderConfig(
      sourceType=messages.ProviderConfig.SourceTypeValueValuesEnum(
          provider_source
      )
  )
  return provider_config


def Apply(
    messages,
    async_,
    deployment_full_name,
    service_account,
    tf_version_constraint=None,
    local_source=None,
    stage_bucket=None,
    ignore_file=None,
    import_existing_resources=False,
    artifacts_gcs_bucket=None,
    worker_pool=None,
    gcs_source=None,
    git_source_repo=None,
    git_source_directory=None,
    git_source_ref=None,
    input_values=None,
    inputs_file=None,
    labels=None,
    quota_validation=None,
    annotations=None,
    provider_source=None,
):
  """Updates the deployment if one exists, otherwise creates a deployment.

  Args:
    messages: ModuleType, the messages module that lets us form blueprints API
      messages based on our protos.
    async_: bool, if True, gcloud will return immediately, otherwise it will
      wait on the long-running operation.
    deployment_full_name: string, the fully qualified name of the deployment,
      e.g. "projects/p/locations/l/deployments/d".
    service_account: User-specified Service Account (SA) to be used as
      credential to manage resources. e.g.
      `projects/{projectID}/serviceAccounts/{serviceAccount}` The default Cloud
      Build SA will be used initially if this field is not set.
    tf_version_constraint: User-specified Terraform version constraint, for
      example, "=1.3.10".
    local_source: Local storage path where config files are stored.
    stage_bucket: optional string. Destination for storing local config files
      specified by local source flag. e.g. "gs://bucket-name/".
    ignore_file: optional string, a path to a gcloudignore file.
    import_existing_resources: By default, Infrastructure Manager will return a
      failure when Terraform encounters a 409 code (resource conflict error)
      during actuation. If this flag is set to true, Infrastructure Manager will
      instead attempt to automatically import the resource into the Terraform
      state (for supported resource types) and continue actuation.
    artifacts_gcs_bucket: User-defined location of Cloud Build logs and
      artifacts in Google Cloud Storage. e.g. `gs://{bucket}/{folder}` A default
      bucket will be bootstrapped if the field is not set or empty
    worker_pool: The User-specified Worker Pool resource in which the Cloud
      Build job will execute. If this field is unspecified, the default Cloud
      Build worker pool will be used. e.g.
      projects/{project}/locations/{location}/workerPools/{workerPoolId}
    gcs_source:  URI of an object in Google Cloud Storage. e.g.
      `gs://{bucket}/{object}`
    git_source_repo: Repository URL.
    git_source_directory: Subdirectory inside the git repository.
    git_source_ref: Git branch or tag.
    input_values: Input variable values for the Terraform blueprint. It only
      accepts (key, value) pairs where value is a scalar value.
    inputs_file: Accepts .tfvars file.
    labels: User-defined metadata for the deployment.
    quota_validation: Input to control quota checks for resources in terraform'
      configuration files.
    annotations: User-defined annotations for the deployment.
    provider_source: Input to control from where to fetch providers.

  Returns:
    The resulting Deployment resource or, in the case that async_ is True, a
      long-running operation.

  Raises:
    InvalidArgumentException: If an invalid set of flags is provided (e.g.
      trying to run with --target-git-subdir but without --target-git).
  """

  labels_message, annotations_message = _GeneratesLabelsAnnotations(
      messages.Deployment(), labels, annotations
  )

  additional_properties = []
  if input_values is not None:
    for key, value in six.iteritems(input_values):
      additional_properties.append(
          messages.TerraformBlueprint.InputValuesValue.AdditionalProperty(
              key=key,
              value=messages.TerraformVariable(
                  inputValue=encoding.PyValueToMessage(
                      extra_types.JsonValue, value
                  )
              ),
          )
      )
  elif inputs_file is not None:
    if sys.version_info < (3, 6):
      raise errors.InvalidDataError(
          '--inputs-file flag is only supported for python version 3.6 and'
          ' above.'
      )
    tfvar_values = tfvars_parser.ParseTFvarFile(inputs_file)
    for key, value in six.iteritems(tfvar_values):
      additional_properties.append(
          messages.TerraformBlueprint.InputValuesValue.AdditionalProperty(
              key=key,
              value=messages.TerraformVariable(
                  inputValue=encoding.PyValueToMessage(
                      extra_types.JsonValue, value
                  )
              ),
          )
      )

  tf_input_values = messages.TerraformBlueprint.InputValuesValue(
      additionalProperties=additional_properties
  )

  deployment_ref = resources.REGISTRY.Parse(
      deployment_full_name, collection='config.projects.locations.deployments'
  )
  # Get just the ID from the fully qualified name.
  deployment_id = deployment_ref.Name()

  location = deployment_ref.Parent().Name()

  # Check if a deployment with the given name already exists. If it does, we'll
  # update that deployment. If not, we'll create it.
  try:
    existing_deployment = configmanager_util.GetDeployment(deployment_full_name)
  except apitools_exceptions.HttpNotFoundError:
    existing_deployment = None

  if existing_deployment is not None:
    # cleanup objects before uploading any new staging object
    _CleanupGCSStagingObjectsNotInUse(
        location, deployment_full_name, deployment_id
    )

  tf_blueprint = _CreateTFBlueprint(
      messages,
      deployment_id,
      None,
      location,
      local_source,
      stage_bucket,
      ignore_file,
      gcs_source,
      git_source_repo,
      git_source_directory,
      git_source_ref,
      tf_input_values,
  )
  provider_config = _CreateProviderConfig(messages, provider_source)

  deployment = messages.Deployment(
      name=deployment_full_name,
      serviceAccount=service_account,
      importExistingResources=import_existing_resources,
      workerPool=worker_pool,
      terraformBlueprint=tf_blueprint,
      labels=labels_message,
      annotations=annotations_message,
      tfVersionConstraint=tf_version_constraint,
      quotaValidation=quota_validation,
      providerConfig=provider_config,
  )

  if artifacts_gcs_bucket is not None:
    deployment.artifactsGcsBucket = artifacts_gcs_bucket

  is_creating_deployment = existing_deployment is None
  op = None

  if is_creating_deployment:
    op = _CreateDeploymentOp(deployment, deployment_full_name)
  else:
    op = _UpdateDeploymentOp(
        deployment, existing_deployment, deployment_full_name, labels
    )

  log.debug('LRO: %s', op.name)

  # If the user chose to run asynchronously, then we'll match the output that
  # the automatically-generated Delete command issues and return immediately.
  if async_:
    log.status.Print(
        '{0} request issued for: [{1}]'.format(
            'Create' if is_creating_deployment else 'Update', deployment_id
        )
    )

    log.status.Print('Check operation [{}] for status.'.format(op.name))

    return op

  progress_message = '{} the deployment'.format(
      'Creating' if is_creating_deployment else 'Updating'
  )

  applied_deployment = configmanager_util.WaitForApplyDeploymentOperation(
      op, progress_message
  )

  if (
      applied_deployment.state
      == messages.Deployment.StateValueValuesEnum.FAILED
  ):
    raise errors.OperationFailedError(applied_deployment.stateDetail)

  return applied_deployment


def _CreateDeploymentOp(
    deployment,
    deployment_full_name,
):
  """Initiates and returns a CreateDeployment operation.

  Args:
    deployment: A partially filled messages.Deployment. The deployment will be
      filled with other details before the operation is initiated.
    deployment_full_name: string, the fully qualified name of the deployment,
      e.g. "projects/p/locations/l/deployments/d".

  Returns:
    The CreateDeployment operation.
  """
  deployment_ref = resources.REGISTRY.Parse(
      deployment_full_name, collection='config.projects.locations.deployments'
  )
  location_ref = deployment_ref.Parent()
  # Get just the ID from the fully qualified name.
  deployment_id = deployment_ref.Name()

  log.info('Creating the deployment')
  return configmanager_util.CreateDeployment(
      deployment, deployment_id, location_ref.RelativeName()
  )


def _UpdateDeploymentOp(
    deployment,
    existing_deployment,
    deployment_full_name,
    labels,
):
  """Initiates and returns an UpdateDeployment operation.

  Args:
    deployment: A partially filled messages.Deployment. The deployment will be
      filled with its target (e.g. configController, gitTarget, etc.) before the
      operation is initiated.
    existing_deployment: A messages.Deployment. The existing deployment to
      update.
    deployment_full_name: string, the fully qualified name of the deployment,
      e.g. "projects/p/locations/l/deployments/d".
    labels: dictionary of string → string, labels to be associated with the
      deployment.

  Returns:
    The UpdateDeployment operation.
  """

  log.info('Updating the existing deployment')

  # If the user didn't specify labels here, then we don't want to overwrite
  # the existing labels on the deployment, so we provide them back to the
  # underlying API.
  if labels is None:
    deployment.labels = existing_deployment.labels

  return configmanager_util.UpdateDeployment(deployment, deployment_full_name)


def ImportStateFile(messages, deployment_full_name, lock_id, file=None):
  """Creates a signed uri to upload the state file.

  Args:
    messages: ModuleType, the messages module that lets us form Infra Manager
      API messages based on our protos.
    deployment_full_name: string, the fully qualified name of the deployment,
      e.g. "projects/p/locations/l/deployments/d".
    lock_id: Lock ID of the lock file to verify person importing owns lock.
    file: file path of the statefile to be uploaded.

  Returns:
    A messages.StateFile which contains signed uri to be used to upload a state
    file.
  """

  import_state_file_request = messages.ImportStatefileRequest(
      lockId=int(lock_id),
  )

  log.status.Print('Creating signed uri for ImportStatefile request...')

  state_file = configmanager_util.ImportStateFile(
      import_state_file_request, deployment_full_name
  )
  if file is None:
    return state_file
  state_file_url = state_file.signedUri
  with files.BinaryFileReader(os.path.abspath(file)) as state_file_obj:
    requests.GetSession().put(state_file_url, data=state_file_obj)

  log.status.Print(f'Statefile {file} uploaded successfully.')
  return


def ExportDeploymentStateFile(
    messages, deployment_full_name, draft=False, file=None
    ):
  """Creates a signed uri to download the state file.

  Args:
    messages: ModuleType, the messages module that lets us form Infra Manager
      API messages based on our protos.
    deployment_full_name: string, the fully qualified name of the deployment,
      e.g. "projects/p/locations/l/deployments/d".
    draft: Lock ID of the lock file to verify person importing owns lock.
    file: string, the file name to download statefile to.

  Returns:
    A messages.StateFile which contains signed uri to be used to download a
    state file.
  """

  export_deployment_state_file_request = (
      messages.ExportDeploymentStatefileRequest(
          draft=draft,
      )
  )

  log.status.Print('Initiating export state file request...')

  state_file = configmanager_util.ExportDeploymentStateFile(
      export_deployment_state_file_request, deployment_full_name
  )
  if file is None:
    return state_file
  state_file_data = requests.GetSession().get(
      state_file.signedUri
      )
  if file.endswith(os.sep):
    file += 'statefile'
  files.WriteBinaryFileContents(file +'.tfstate', state_file_data.content)
  log.status.Print(
      f'Exported statefile {file}.tfstate'
      )
  return


def ExportRevisionStateFile(messages, deployment_full_name):
  """Creates a signed uri to download the state file.

  Args:
    messages: ModuleType, the messages module that lets us form Infra Manager
      API messages based on our protos.
    deployment_full_name: string, the fully qualified name of the deployment,
      e.g. "projects/p/locations/l/deployments/d".

  Returns:
    A messages.StateFile which contains signed uri to be used to upload a state
    file.
  """

  export_revision_state_file_request = messages.ExportRevisionStatefileRequest()

  log.status.Print('Initiating export state file request...')

  state_file = configmanager_util.ExportRevisionStateFile(
      export_revision_state_file_request, deployment_full_name
  )

  return state_file


def ExportLock(deployment_full_name):
  """Exports lock info of the deployment.

  Args:
    deployment_full_name: string, the fully qualified name of the deployment,
      e.g. "projects/p/locations/l/deployments/d".

  Returns:
    A lock info response.
  """

  log.status.Print('Initiating export lock request...')

  lock_info = configmanager_util.ExportLock(
      deployment_full_name,
  )

  return lock_info


def LockDeployment(messages, async_, deployment_full_name):
  """Locks the deployment.

  Args:
    messages: ModuleType, the messages module that lets us form Infra Manager
      API messages based on our protos.
    async_: bool, if True, gcloud will return immediately, otherwise it will
      wait on the long-running operation.
    deployment_full_name: string, the fully qualified name of the deployment,
      e.g. "projects/p/locations/l/deployments/d".

  Returns:
    A lock info resource or, in case async_ is True, a
      long-running operation.
  """

  lock_deployment_request = messages.LockDeploymentRequest()

  op = configmanager_util.LockDeployment(
      lock_deployment_request, deployment_full_name
  )

  deployment_ref = resources.REGISTRY.Parse(
      deployment_full_name, collection='config.projects.locations.deployments'
  )
  # Get just the ID from the fully qualified name.
  deployment_id = deployment_ref.Name()

  log.debug('LRO: %s', op.name)

  if async_:
    log.status.Print(
        'Lock deployment request issued for: [{0}]'.format(deployment_id)
    )

    log.status.Print('Check operation [{}] for status.'.format(op.name))
    return op

  progress_message = 'Locking the deployment'

  lock_response = configmanager_util.WaitForApplyDeploymentOperation(
      op, progress_message
  )

  if (
      lock_response.lockState
      == messages.Deployment.LockStateValueValuesEnum.LOCK_FAILED
  ):
    raise errors.OperationFailedError('Lock deployment operation failed.')

  return ExportLock(deployment_full_name)


def UnlockDeployment(
    messages,
    async_,
    deployment_full_name,
    lock_id,
):
  """Unlocks the deployment.

  Args:
    messages: ModuleType, the messages module that lets us form Infra Manager
      API messages based on our protos.
    async_: bool, if True, gcloud will return immediately, otherwise it will
      wait on the long-running operation.
    deployment_full_name: string, the fully qualified name of the deployment,
      e.g. "projects/p/locations/l/deployments/d".
    lock_id: Lock ID of the deployment to be unlocked.

  Returns:
    A deployment resource or, in case async_ is True, a
      long-running operation.
  """

  unlock_deployment_request = messages.UnlockDeploymentRequest(
      lockId=int(lock_id),
  )

  deployment_ref = resources.REGISTRY.Parse(
      deployment_full_name, collection='config.projects.locations.deployments'
  )
  # Get just the ID from the fully qualified name.
  deployment_id = deployment_ref.Name()

  op = configmanager_util.UnlockDeployment(
      unlock_deployment_request, deployment_full_name
  )

  log.debug('LRO: %s', op.name)

  if async_:
    log.status.Print(
        'Unlock deployment request issued for: [{0}]'.format(deployment_id)
    )

    log.status.Print('Check operation [{}] for status.'.format(op.name))

    return op

  progress_message = 'Unlocking the deployment'

  unlock_response = configmanager_util.WaitForApplyDeploymentOperation(
      op, progress_message
  )

  if (
      unlock_response.lockState
      == messages.Deployment.LockStateValueValuesEnum.UNLOCK_FAILED
  ):
    raise errors.OperationFailedError('Unlock deployment operation failed.')

  return unlock_response


def _CleanupGCSStagingObjectsNotInUse(
    location, deployment_full_name, deployment_id
):
  """Deletes staging object for all revisions except for last successful revision.

  Args:
    location: The location of deployment.
    deployment_full_name: string, the fully qualified name of the deployment,
      e.g. "projects/p/locations/l/deployments/d".
    deployment_id: the short name of the deployment.

  Raises:
    NotFoundError: If the bucket or folder does not exist.
  """

  gcs_client = storage_api.StorageClient()
  gcs_staging_dir = staging_bucket_util.DefaultGCSStagingDir(
      deployment_id, location
  )
  gcs_staging_dir_ref = resources.REGISTRY.Parse(
      gcs_staging_dir, collection='storage.objects'
  )
  bucket_ref = storage_util.BucketReference(gcs_staging_dir_ref.bucket)

  staged_objects = set()
  try:
    items = gcs_client.ListBucket(bucket_ref, gcs_staging_dir_ref.object)
    for item in items:
      item_dir = '/'.join(item.name.split('/')[:4])
      staged_objects.add(
          'gs://{0}/{1}'.format(gcs_staging_dir_ref.bucket, item_dir)
      )
    if not staged_objects:
      return
  except storage_api.BucketNotFoundError:
    # if staging bucket does not exist, there is nothing to clean.
    return

  op = configmanager_util.ListRevisions(deployment_full_name)
  revisions = sorted(
      op.revisions, key=lambda x: GetRevisionNumber(x.name), reverse=True
  )

  # discard gcsSource in latest revision from being deleted
  lastest_revision = revisions[0]
  if lastest_revision.terraformBlueprint is not None:
    staged_objects.discard(lastest_revision.terraformBlueprint.gcsSource)

  # discard staged object for last successful revision from being deleted
  for revision in revisions:
    if str(revision.state) == 'APPLIED':
      if revision.terraformBlueprint is not None:
        staged_objects.discard(revision.terraformBlueprint.gcsSource)
      break

  for obj in staged_objects:
    staging_bucket_util.DeleteStagingGCSFolder(gcs_client, obj)


def GetRevisionNumber(revision_full_name):
  """Returns the revision number from the revision name.

     e.g. - returns 12 for
     projects/p1/locations/l1/deployments/d1/revisions/r-12.

  Args:
    revision_full_name: string, the fully qualified name of the revision, e.g.
      "projects/p/locations/l/deployments/d/revisions/r-12".

  Returns:
    a revision number.
  """
  revision_ref = resources.REGISTRY.Parse(
      revision_full_name,
      collection='config.projects.locations.deployments.revisions',
  )
  revision_short_name = revision_ref.Name()
  return int(revision_short_name[2:])


def ExportPreviewResult(messages, preview_full_name, file=None):
  """Creates a signed uri to download the preview results.

  Args:
    messages: ModuleType, the messages module that lets us form Infra Manager
      API messages based on our protos.
    preview_full_name: string, the fully qualified name of the preview,
      e.g. "projects/p/locations/l/previews/p".]
    file: string, the file name to download results to.
  Returns:
    A messages.ExportPreviewResultResponse which contains signed uri to
    download binary and json plan files.
  """

  export_preview_result_request = (
      messages.ExportPreviewResultRequest()
  )

  log.status.Print('Initiating export preview results...')

  preview_result_resp = configmanager_util.ExportPreviewResult(
      export_preview_result_request, preview_full_name
  )
  if file is None:
    return preview_result_resp
  binary_data = requests.GetSession().get(
      preview_result_resp.result.binarySignedUri
      )
  json_data = requests.GetSession().get(
      preview_result_resp.result.jsonSignedUri
      )
  if file.endswith(os.sep):
    file += 'preview'
  files.WriteBinaryFileContents(file +'.tfplan', binary_data.content)
  files.WriteBinaryFileContents(file + '.json', json_data.content)
  log.status.Print(
      f'Exported preview artifacts {file}.tfplan and {file}.json'
      )
  return


def Create(
    messages,
    async_,
    preview_full_name,
    location_full_name,
    deployment,
    preview_mode,
    service_account,
    local_source=None,
    stage_bucket=None,
    ignore_file=None,
    artifacts_gcs_bucket=None,
    worker_pool=None,
    gcs_source=None,
    git_source_repo=None,
    git_source_directory=None,
    git_source_ref=None,
    input_values=None,
    inputs_file=None,
    labels=None,
    annotations=None,
    provider_source=None,
    tf_version_constraint=None,
):
  """Creates a preview.

  Args:
    messages: ModuleType, the messages module that lets us form blueprints API
      messages based on our protos.
    async_: bool, if True, gcloud will return immediately, otherwise it will
      wait on the long-running operation.
    preview_full_name: string, full path name of the preview. e.g.
      "projects/p/locations/l/previews/preview".
    location_full_name: string, full path name of the location. e.g.
      "projects/p/locations/l".
    deployment: deployment reference for a preview.
    preview_mode: preview mode to be set for a preview.
    service_account: User-specified Service Account (SA) to be used as
      credential to manage resources. e.g.
      `projects/{projectID}/serviceAccounts/{serviceAccount}` The default Cloud
      Build SA will be used initially if this field is not set.
    local_source: Local storage path where config files are stored.
    stage_bucket: optional string. Destination for storing local config files
      specified by local source flag. e.g. "gs://bucket-name/".
    ignore_file: optional string, a path to a gcloudignore file.
    artifacts_gcs_bucket: User-defined location of Cloud Build logs and
      artifacts in Google Cloud Storage. e.g. `gs://{bucket}/{folder}` A default
      bucket will be bootstrapped if the field is not set or empty
    worker_pool: The User-specified Worker Pool resource in which the Cloud
      Build job will execute. If this field is unspecified, the default Cloud
      Build worker pool will be used. e.g.
      projects/{project}/locations/{location}/workerPools/{workerPoolId}
    gcs_source:  URI of an object in Google Cloud Storage. e.g.
      `gs://{bucket}/{object}`
    git_source_repo: Repository URL.
    git_source_directory: Subdirectory inside the git repository.
    git_source_ref: Git branch or tag.
    input_values: Input variable values for the Terraform blueprint. It only
      accepts (key, value) pairs where value is a scalar value.
    inputs_file: Accepts .tfvars file.
    labels: User-defined metadata for the preview.
    annotations: User-defined annotations for the preview.
    provider_source: Input to control where to fetch providers.
    tf_version_constraint: User-specified Terraform version constraint, for
      example, "=1.3.10".

  Returns:
    The resulting Preview resource or, in the case that async_ is True, a
      long-running operation.

  Raises:
    InvalidArgumentException: If an invalid set of flags is provided (e.g.
      trying to run with --target-git-subdir but without --target-git).
  """

  additional_properties = _ParseInputValuesAndFile(
      input_values, inputs_file, messages)
  labels_message, annotations_message = _GeneratesLabelsAnnotations(
      messages.Preview(), labels, annotations
  )

  tf_input_values = messages.TerraformBlueprint.InputValuesValue(
      additionalProperties=additional_properties
  )

  if preview_full_name is not None:
    preview_ref = resources.REGISTRY.Parse(
        preview_full_name, collection='config.projects.locations.previews'
    )
    location = preview_ref.Parent().Name()
    preview_id = preview_ref.Name()
  else:
    location_ref = resources.REGISTRY.Parse(
        location_full_name, collection='config.projects.locations'
    )
    location = location_ref.Name()
    # Hardcoding preview_id when preview_full_name is none as it's needed for
    # bucket creation for local source in the CreateTFBlueprint workflow.
    preview_id = 'im-preview-{uuid}'.format(uuid=uuid.uuid4())

  tf_blueprint = _CreateTFBlueprint(
      messages,
      None,
      preview_id,
      location,
      local_source,
      stage_bucket,
      ignore_file,
      gcs_source,
      git_source_repo,
      git_source_directory,
      git_source_ref,
      tf_input_values,
  )
  provider_config = _CreateProviderConfig(messages, provider_source)

  preview = messages.Preview(
      serviceAccount=service_account,
      workerPool=worker_pool,
      labels=labels_message,
      annotations=annotations_message,
      providerConfig=provider_config,
      tfVersionConstraint=tf_version_constraint,
  )

  # set tf_blueprint only when one of the three sources is specified.
  if any(_ is not None for _ in (gcs_source, local_source, git_source_repo)):
    preview.terraformBlueprint = tf_blueprint

  if preview_full_name is not None:
    preview.name = preview_full_name

  if preview_mode is not None:
    try:
      preview.previewMode = messages.Preview.PreviewModeValueValuesEnum(
          preview_mode
      )
    except TypeError:
      raise c_exceptions.UnknownArgumentException(
          '--preview-mode',
          'Invalid preview mode. Supported values are DEFAULT and DELETE.')

  if deployment is not None:
    preview.deployment = deployment

  if artifacts_gcs_bucket is not None:
    preview.artifactsGcsBucket = artifacts_gcs_bucket

  op = _CreatePreviewOp(preview, preview_full_name, location_full_name)

  log.debug('LRO: %s', op.name)

  # If the user chose to run asynchronously, then we'll match the output that
  # the automatically-generated Delete command issues and return immediately.
  if async_:
    log.status.Print(
        '{0} request issued for: [{1}]'.format(
            'Create', preview_id
        )
    )

    log.status.Print('Check operation [{}] for status.'.format(op.name))

    return op

  progress_message = '{} the preview'.format(
      'Creating'
  )

  applied_preview = configmanager_util.WaitForCreatePreviewOperation(
      op, progress_message
  )

  if (
      applied_preview.state
      == messages.Preview.StateValueValuesEnum.FAILED
  ):
    raise errors.OperationFailedError(applied_preview.errorStatus.message)

  log.status.Print('Created preview: {}'.format(applied_preview.name))
  return applied_preview


def _CreatePreviewOp(preview, preview_full_name, location_full_name):
  """Initiates and returns a CreatePreview operation.

  Args:
    preview: A partially filled messages.preview. The preview will be filled
      with other details before the operation is initiated.
    preview_full_name: string, the fully qualified name of the preview, e.g.
      "projects/p/locations/l/previews/p".
    location_full_name: string, the fully qualified name of the location, e.g.
      "projects/p/locations/l".

  Returns:
    The CreatePreview operation.
  """
  if preview_full_name is None:
    return configmanager_util.CreatePreview(preview, None, location_full_name)
  preview_ref = resources.REGISTRY.Parse(
      preview_full_name, collection='config.projects.locations.previews')
  preview_id = preview_ref.Name()
  log.info('Creating the preview')
  return configmanager_util.CreatePreview(
      preview, preview_id, location_full_name
  )


def _ParseInputValuesAndFile(input_values, inputs_file, messages):
  """Parses input file or values and returns a list of additional properties.

  Args:
    input_values: Input variable values for the Terraform blueprint. It only
      accepts (key, value) pairs where value is a scalar value.
    inputs_file: Accepts .tfvars file.
    messages: ModuleType, the messages module that lets us form blueprints API
      messages based on our protos.

  Returns:
    The additional_properties list.
  """
  additional_properties = []
  if input_values is not None:
    for key, value in six.iteritems(input_values):
      additional_properties.append(
          messages.TerraformBlueprint.InputValuesValue.AdditionalProperty(
              key=key,
              value=messages.TerraformVariable(
                  inputValue=encoding.PyValueToMessage(
                      extra_types.JsonValue, value
                  )
              ),
          )
      )
  elif inputs_file is not None:
    tfvar_values = tfvars_parser.ParseTFvarFile(inputs_file)
    for key, value in six.iteritems(tfvar_values):
      additional_properties.append(
          messages.TerraformBlueprint.InputValuesValue.AdditionalProperty(
              key=key,
              value=messages.TerraformVariable(
                  inputValue=encoding.PyValueToMessage(
                      extra_types.JsonValue, value
                  )
              ),
          )
      )
  return additional_properties


def _GeneratesLabelsAnnotations(resource, labels=None, annotations=None):
  """Generates labels and annotations messages.

  Args:
    resource: Resource type, can be deployment or preview.
    labels: dict[str,str], labels to be associated with the resource.
    annotations: dict[str,str], annotations to be associated with the resource.

  Returns:
    Label and annotation messages.
  """
  labels_message = {}
  annotations_message = {}
  # Whichever labels the user provides will become the full set of labels in the
  # resulting deployment or preview.
  if labels is not None:
    labels_message = resource.LabelsValue(
        additionalProperties=[
            resource.LabelsValue.AdditionalProperty(key=key, value=value)
            for key, value in six.iteritems(labels)
        ]
    )

  if annotations is not None:
    annotations_message = resource.AnnotationsValue(
        additionalProperties=[
            resource.AnnotationsValue.AdditionalProperty(key=key, value=value)
            for key, value in six.iteritems(annotations)
        ]
    )
  return labels_message, annotations_message