HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/surface/storage/rm.py
# -*- coding: utf-8 -*- #
# Copyright 2021 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of rm command for deleting resources."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections

from googlecloudsdk.api_lib.storage import cloud_api
from googlecloudsdk.api_lib.storage import errors as api_errors
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.storage import flags
from googlecloudsdk.command_lib.storage import folder_util
from googlecloudsdk.command_lib.storage import name_expansion
from googlecloudsdk.command_lib.storage import plurality_checkable_iterator
from googlecloudsdk.command_lib.storage import rm_command_util
from googlecloudsdk.command_lib.storage import stdin_iterator
from googlecloudsdk.command_lib.storage import user_request_args_factory
from googlecloudsdk.command_lib.storage.tasks import task_executor
from googlecloudsdk.command_lib.storage.tasks import task_graph_executor
from googlecloudsdk.command_lib.storage.tasks import task_status
from googlecloudsdk.command_lib.storage.tasks.rm import delete_task_iterator_factory
from googlecloudsdk.core import log


@base.UniverseCompatible
class Rm(base.Command):
  """Delete objects and buckets."""

  detailed_help = {
      'DESCRIPTION':
          """
      Delete objects and buckets.
      """,
      'EXAMPLES':
          """

      The following command deletes a Cloud Storage object named ``my-object''
      from the bucket ``my-bucket'':

        $ {command} gs://my-bucket/my-object

      The following command deletes all objects directly within the directory
      ``my-dir'' but no objects within subdirectories:

        $ {command} gs://my-bucket/my-dir/*

      The following command deletes all objects and subdirectories within the
      directory ``my-dir'':

        $ {command} gs://my-bucket/my-dir/**

      Note that for buckets that contain
      [versioned objects](https://cloud.google.com/storage/docs/object-versioning),
      the above command only affects live versions. Use the `--recursive` flag
      instead to delete all versions.

      The following command deletes all versions of all resources in
      ``my-bucket'' and then deletes the bucket.

        $ {command} --recursive gs://my-bucket/

      The following command deletes all text files in the top-level of
      ``my-bucket'', but not text files in subdirectories:

        $ {command} -recursive gs://my-bucket/*.txt

      The following command deletes one wildcard expression per line passed
      in by stdin:

        $ some_program | {command} -I
      """,
  }

  @classmethod
  def Args(cls, parser):
    parser.add_argument(
        'urls',
        nargs='*',
        help='The URLs of the resources to delete.')
    parser.add_argument(
        '--recursive',
        '-R',
        '-r',
        action='store_true',
        help=(
            'Recursively delete the contents of buckets or directories that'
            ' match the path expression.'
            ' By default, this will delete managed folders as well.'
            ' If the path is set to a bucket, like'
            " ``gs://bucket'', the bucket is also deleted. This option"
            ' implies the `--all-versions` option. If you want to delete only'
            " live object versions, use the ``**'' wildcard instead."
        ),
    )
    parser.add_argument(
        '--all-versions',
        '-a',
        action='store_true',
        help='Delete all'
        ' [versions](https://cloud.google.com/storage/docs/object-versioning)'
        ' of an object.')

    parser.add_argument(
        '--exclude-managed-folders',
        action='store_true',
        default=False,
        help=(
            'Excludes managed folders from command operations. By default'
            ' gcloud storage includes managed folders in recursive removals.'
            ' Please note that this flag would not be applicable for'
            ' hierarchical namespace buckets as we always list managed folders'
            ' for these buckets.'
        ),
    )

    flags.add_additional_headers_flag(parser)
    flags.add_continue_on_error_flag(parser)
    flags.add_precondition_flags(parser)
    flags.add_read_paths_from_stdin_flag(parser)

  def Run(self, args):

    if args.recursive:
      bucket_setting = name_expansion.BucketSetting.YES
      object_state = cloud_api.ObjectState.LIVE_AND_NONCURRENT
      recursion_setting = name_expansion.RecursionSetting.YES
    else:
      bucket_setting = name_expansion.BucketSetting.NO
      object_state = flags.get_object_state_from_flags(args)
      recursion_setting = name_expansion.RecursionSetting.NO_WITH_WARNING

    should_perform_managed_folder_operations = (
        args.recursive and not args.exclude_managed_folders
    )

    url_found_match_tracker = collections.OrderedDict()
    # We need to ensure that if we are going to do additional lookups for
    # Folders or Managed Folders, then we do not throw an error for unmatched
    # URLs at this stage.
    # If the command is run with recursive option, then for sure we will do
    # a Folders look up at a minimum and hence we will let that lookup throw
    # an error if we are not going to include managed folders or fallback to the
    # managed folders lookup stage.
    # If the command is run without a recursive option, then for sure we will
    # not do a Folders lookup and we will also not do a Managed Folders lookup
    # hence, this becomes the right place to throw an error.
    name_expansion_iterator = name_expansion.NameExpansionIterator(
        stdin_iterator.get_urls_iterable(args.urls, args.read_paths_from_stdin),
        fields_scope=cloud_api.FieldsScope.SHORT,
        include_buckets=bucket_setting,
        managed_folder_setting=folder_util.ManagedFolderSetting.DO_NOT_LIST,
        folder_setting=folder_util.FolderSetting.LIST_AS_PREFIXES,
        object_state=object_state,
        raise_error_for_unmatched_urls=not args.recursive,
        recursion_requested=recursion_setting,
        url_found_match_tracker=url_found_match_tracker,
    )

    user_request_args = (
        user_request_args_factory.get_user_request_args_from_command_args(args))
    task_status_queue = task_graph_executor.multiprocessing_context.Queue()
    task_iterator_factory = (
        delete_task_iterator_factory.DeleteTaskIteratorFactory(
            name_expansion_iterator,
            task_status_queue=task_status_queue,
            user_request_args=user_request_args))

    log.status.Print('Removing objects:')
    object_exit_code = task_executor.execute_tasks(
        task_iterator_factory.object_iterator(),
        parallelizable=True,
        task_status_queue=task_status_queue,
        progress_manager_args=task_status.ProgressManagerArgs(
            increment_type=task_status.IncrementType.INTEGER, manifest_path=None
        ),
        continue_on_error=args.continue_on_error,
    )

    if args.recursive:
      folder_expansion_iterator = name_expansion.NameExpansionIterator(
          args.urls,
          folder_setting=folder_util.FolderSetting.LIST_AS_FOLDERS,
          raise_error_for_unmatched_urls=not should_perform_managed_folder_operations,
          recursion_requested=recursion_setting,
          url_found_match_tracker=url_found_match_tracker,
      )
      try:
        folder_exit_code = rm_command_util.remove_folders(
            folder_expansion_iterator,
            task_status_queue,
            verbose=True,
        )
      except api_errors.GcsApiError as error:
        if error.payload.status_code != 403:
          # Avoids unexpectedly escalating permissions.
          raise
        log.warning('Unable to delete folders due to missing permissions.')
        folder_exit_code = 0
    else:
      folder_exit_code = 0

    if should_perform_managed_folder_operations:
      managed_folder_expansion_iterator = name_expansion.NameExpansionIterator(
          args.urls,
          managed_folder_setting=folder_util.ManagedFolderSetting.LIST_WITHOUT_OBJECTS,
          raise_error_for_unmatched_urls=True,
          # `rm` defaults to including managed folders, but this will raise a
          # precondition error if the command targets a non-UBLA bucket. These
          # errors should be silenced.
          raise_managed_folder_precondition_errors=False,
          recursion_requested=name_expansion.RecursionSetting.YES,
          url_found_match_tracker=url_found_match_tracker,
      )
      try:
        managed_folder_exit_code = rm_command_util.remove_managed_folders(
            args,
            managed_folder_expansion_iterator,
            task_status_queue,
            verbose=True,
        )
      except api_errors.GcsApiError as error:
        if error.payload.status_code != 403:
          # Avoids unexpectedly escalating permissions.
          raise
        log.warning(
            'Unable to delete managed folders due to missing permissions.'
        )
        managed_folder_exit_code = 0
    else:
      managed_folder_exit_code = 0

    bucket_iterator = plurality_checkable_iterator.PluralityCheckableIterator(
        task_iterator_factory.bucket_iterator()
    )

    # We perform the is_empty check to avoid printing unneccesary status lines.
    if args.recursive and not bucket_iterator.is_empty():
      log.status.Print('Removing buckets:')
      bucket_exit_code = task_executor.execute_tasks(
          bucket_iterator,
          parallelizable=True,
          task_status_queue=task_status_queue,
          progress_manager_args=task_status.ProgressManagerArgs(
              increment_type=task_status.IncrementType.INTEGER,
              manifest_path=None,
          ),
          continue_on_error=args.continue_on_error,
      )
    else:
      bucket_exit_code = 0

    self.exit_code = max(
        object_exit_code,
        managed_folder_exit_code,
        folder_exit_code,
        bucket_exit_code,
    )