HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/apigee/archives.py
# -*- coding: utf-8 -*- # Lint as: python3
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper functions for working with Apigee archive deployments."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import os
import zipfile

from googlecloudsdk.api_lib import apigee
from googlecloudsdk.command_lib.apigee import errors
from googlecloudsdk.core import log
from googlecloudsdk.core import requests
from googlecloudsdk.core.resource import resource_projector
from googlecloudsdk.core.util import archive
from googlecloudsdk.core.util import files
from six.moves import urllib


class LocalDirectoryArchive(object):
  """Manages a local zip archive."""

  # The archive file name to save to.
  _ARCHIVE_FILE_NAME = 'apigee_archive_deployment.zip'
  _APIGEE_ARCHIVE_FILE_EXTENSIONS = [
      '.graphql',
      '.jar',
      '.java',
      '.js',
      '.jsc',
      '.json',
      '.oas',
      '.properties',
      '.py',
      '.securityPolicy',
      '.wsdl',
      '.xml',
      '.xsd',
      '.xsl',
      '.yaml',
      '.yml',
  ]
  _ARCHIVE_ROOT = os.path.join('src', 'main', 'apigee')

  def __init__(self, src_dir):
    self._CheckIfPathExists(src_dir)
    # Check if the path resolves to a directory.
    if src_dir and not os.path.isdir(src_dir):
      raise errors.SourcePathIsNotDirectoryError(src_dir)
    self._src_dir = src_dir if src_dir is not None else files.GetCWD()
    self._tmp_dir = files.TemporaryDirectory()

  def _CheckIfPathExists(self, path):
    """Checks that the given file path exists."""
    if path and not os.path.exists(path):
      raise files.MissingFileError(
          'Path to archive deployment does not exist: {}'.format(path))

  def _ZipFileFilter(self, file_name):
    """Filter all files in the archive directory to only allow Apigee files."""
    if not file_name.startswith(self._ARCHIVE_ROOT):
      return False
    _, ext = os.path.splitext(file_name)
    full_path = os.path.join(self._src_dir, file_name)
    # Skip hidden unix directories. Assume hidden directories and the files
    # within them are not intended to be included. This check needs to happen
    # first so MakeZipFromDir does not continue to process the files within the
    # hidden directory which can contain the same file types that Apigee
    # supports.
    if os.path.basename(full_path).startswith('.'):
      return False
    # MakeZipFromDir will only process files in a directory if the containing
    # directory first evaluates to True, so all directories are approved here.
    if os.path.isdir(full_path):
      return True
    # Only include Apigee supported file extensions.
    if (os.path.isfile(full_path) and
        ext.lower() in self._APIGEE_ARCHIVE_FILE_EXTENSIONS):
      return True
    return False

  def Zip(self):
    """Creates a zip archive of the specified directory."""
    dst_file = os.path.join(self._tmp_dir.path, self._ARCHIVE_FILE_NAME)
    archive.MakeZipFromDir(dst_file, self._src_dir, self._ZipFileFilter)
    return dst_file

  def ValidateZipFilePath(self, zip_path):
    """Checks that the zip file path exists and the file is a zip archvie."""
    self._CheckIfPathExists(zip_path)
    if not zipfile.is_zipfile(zip_path):
      raise errors.BundleFileNotValidError(zip_path)

  def Close(self):
    """Deletes the local temporary directory."""
    return self._tmp_dir.Close()

  def __enter__(self):
    return self

  def __exit__(self, exc_type, val, tb):
    try:
      self.Close()
    except:  # pylint: disable=bare-except
      log.warning('Temporary directory was not successfully deleted.')
      return True


def GetUploadFileId(upload_url):
  """Helper function to extract the upload file id from the signed URL.

  Archive deployments must be uploaded to a provided signed URL in the form of:
  https://storage.googleapis.com/<bucket id>/<file id>.zip?<additional headers>
  This function extracts the file id from the URL (e.g., <file id>.zip).

  Args:
    upload_url: A string of the signed URL.

  Returns:
    A string of the file id.
  """
  url = urllib.parse.urlparse(upload_url)
  split_path = url.path.split('/')
  return split_path[-1]


def UploadArchive(upload_url, zip_file):
  """Uploads the specified zip file with a PUT request to the provided URL.

  Args:
    upload_url: A string of the URL to send the PUT request to. Required to be a
      signed URL from GCS.
    zip_file: A string of the file path to the zip file to upload.

  Returns:
    A requests.Response object.
  """
  sess = requests.GetSession()
  # Required headers for the Apigee generated signed URL.
  headers = {
      'content-type': 'application/zip',
      'x-goog-content-length-range': '0,1073741824'
  }
  with files.BinaryFileReader(zip_file) as data:
    response = sess.put(upload_url, data=data, headers=headers)
  return response


class ListArchives():
  """Adds additional helpful fields to a list of archives."""

  def __init__(self, org):
    self._org = org
    self._lro_helper = apigee.LROPoller(org)
    self._deployed_status = 'Deployed'
    self._inprogress_status = 'In Progress'
    self._failed_status = 'Failed'
    self._not_found_status = 'Not Found'
    self._unknown_status = 'Unknown'
    self._missing_status = 'Missing'

  def ExtendedArchives(self, archives):
    """Given a list of archives, extends them with a status and error field where needed.

    Args:
      archives: A list of archives to extend with a status and potential error.

    Returns:
      A list of archives with their associated status.
    """
    extended_archives = sorted(
        archives, key=lambda k: k['createdAt'], reverse=True)

    # Go through the list of sorted arcvhives and attempt to find the first
    # "Deployed" archive, at which point we can return to the user.
    # If at any point there is an error, other than "Not Found", from
    # describing an operation, set the current archive an all following archives
    # to the "Unknown" state. This is due to the constraint of no longer being
    # able to accurately determine a "Deployed" state at that point in time
    # (as state is time dependent).
    cascade_unknown = False
    for idx, a in enumerate(extended_archives):
      serilized_archive = resource_projector.MakeSerializable(a)
      if cascade_unknown:
        serilized_archive['operationStatus'] = self._unknown_status
      elif 'operation' in a:
        uuid = apigee.OperationsClient.SplitName({'name': a['operation']
                                                 })['uuid']
        try:
          op = apigee.OperationsClient.Describe({
              'organizationsId': self._org,
              'operationsId': uuid
          })
          status = self._StatusFromOperation(op)
          serilized_archive['operationStatus'] = status['status']
          if status['status'] == self._deployed_status:
            extended_archives[idx] = serilized_archive
            return extended_archives
          elif 'error' in status:
            serilized_archive['error'] = status['error']
        except errors.EntityNotFoundError:
          serilized_archive['operationStatus'] = self._not_found_status
        except:  # pylint: disable=bare-except
          cascade_unknown = True
          serilized_archive['operationStatus'] = self._unknown_status
      else:  # Archive didn't have an operation on it to query.
        serilized_archive['operationStatus'] = self._missing_status
      # Set the new archive with status (and potential error) fields filled out.
      extended_archives[idx] = serilized_archive
    return extended_archives

  def _StatusFromOperation(self, op):
    """Gathers given an LRO, determines the associated archive status.

    Args:
      op: An Apigee LRO

    Returns:
      A dict in the format of
        {"status": "{status}", "error": "{error if present on LRO}"}.
    """
    status = {}
    try:
      is_done = self._lro_helper.IsDone(op)
      if is_done:
        # We found the currently deployed archive, no need to continue
        # iterating and set other statuses.
        status['status'] = self._deployed_status
      else:
        # Archive deployment is in progress.
        status['status'] = self._inprogress_status
    except errors.RequestError:
      # Archive's operation did not complete successfully, mark as failed.
      status['status'] = self._failed_status
      # Add the error to the serialized JSON, but this will not be printed
      # in the output columns.
      status['error'] = op['error']['message']
    return status