HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/storage/tasks/task_buffer.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implements a buffer for tasks used in task_graph_executor.

See go/parallel-processing-in-gcloud-storage for more information.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import copy

from six.moves import queue


BUFFER_HEADER = 'Buffer Contents:\n'
BUFFER_EMPTY_MESSAGE = 'Task Buffer is empty.'


class _PriorityWrapper:
  """Wraps a buffered task and tracks priority information.

  Attributes:
    task (Union[task.Task, str]): A buffered item. Expected to be a task or a
      string (to handle shutdowns) when used by task_graph_executor.
    priority (int): The priority of this task. A task with a lower value will be
      executed before a task with a higher value, since queue.PriorityQueue uses
      a min-heap.
  """

  def __init__(self, task, priority):
    self.task = task
    self.priority = priority

  def __lt__(self, other):
    return self.priority < other.priority


class TaskBuffer:
  """Stores and prioritizes tasks.

  The current implementation uses a queue.PriorityQueue under the hood, since
  in experiments we found that the heap it maintains did not add too much
  overhead. If it does end up being a bottleneck, the same API can be
  implemented with a collections.deque.
  """

  def __init__(self):
    self._queue = queue.PriorityQueue()

  def get(self):
    """Removes and returns an item from the buffer.

    Calls to `get` block if there are no elements in the queue, and return
    prioritized items before non-prioritized items.

    Returns:
      A buffered item. Expected to be a task or a string (to handle shutdowns)
      when used by task_graph_executor.
    """
    return self._queue.get().task

  def put(self, task, prioritize=False):
    """Adds an item to the buffer.

    Args:
      task (Union[task.Task, str]): A buffered item. Expected to be a task or a
        string (to handle shutdowns) when used by task_graph_executor.
      prioritize (bool): Tasks added with prioritize=True will be returned by
        `get` before tasks added with prioritize=False.
    """
    priority = 0 if prioritize else 1
    prioritized_item = _PriorityWrapper(task, priority)
    self._queue.put(prioritized_item)

  def size(self) -> int:
    """Returns the number of items in the buffer."""
    return self._queue.qsize()  # pylint: disable=protected-access

  def __str__(self):
    """Returns a string representation of the buffer."""
    if self.size() == 0:
      return BUFFER_EMPTY_MESSAGE

    # Use a List comprehension to create the string representation.
    output_lines = [BUFFER_HEADER]
    temp_queue = copy.deepcopy(self._queue.queue)

    while temp_queue:
      priority_wrapper = temp_queue.pop(0)  # Get and remove the first item.
      output_lines.append(str(priority_wrapper.task))
    return '\n'.join(output_lines)