HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/storage/posix_util.py
# -*- coding: utf-8 -*- #
# Copyright 2022 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""POSIX utilities for storage commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections
import datetime
import os
import stat

from googlecloudsdk.command_lib.storage import errors
from googlecloudsdk.command_lib.storage import storage_url
from googlecloudsdk.command_lib.storage.resources import resource_reference
from googlecloudsdk.core import log
from googlecloudsdk.core.cache import function_result_cache
from googlecloudsdk.core.util import platforms

_MISSING_UID_FORMAT = (
    "UID in {} metadata doesn't exist on current system. UID: {}")
_MISSING_GID_FORMAT = (
    "GID in {} metadata doesn't exist on current system. GID: {}")
_INSUFFICIENT_USER_READ_ACCESS_FORMAT = (
    'Insufficient access to local destination to apply {}. User {} owns'
    ' file, but owner does not have read permission in mode {}.'
)
_INSUFFICIENT_GROUP_READ_ACCESS_FORMAT = (
    'Insufficient access to local destination to apply {}. Group {}'
    ' would own file, but group does not have read permission in mode {}.'
)
_INSUFFICIENT_OTHER_READ_ACCESS_FORMAT = (
    'Insufficient access to local destination to apply {}. UID {} is not'
    ' owner of file, and user is not in a group that owns the file. Users in'
    ' "other" category do not have read permission in mode {}.'
)

# For transporting POSIX info through an object's custom metadata.
ATIME_METADATA_KEY = 'goog-reserved-file-atime'
GID_METADATA_KEY = 'goog-reserved-posix-gid'
MODE_METADATA_KEY = 'goog-reserved-posix-mode'
MTIME_METADATA_KEY = 'goog-reserved-file-mtime'
UID_METADATA_KEY = 'goog-reserved-posix-uid'

_SECONDS_PER_DAY = 86400


def convert_base_ten_to_base_eight_str(base_ten_int):
  """Takes base ten integer, converts to octal, and removes extra chars."""
  # Example: 73 -> '0o111' -> '111'.
  # Remove leading '0o'.
  oct_string = oct(base_ten_int)[2:]
  # Take trailing three digits. For example, '0' -> '0' or '123' -> '11123'.
  permission_bytes = oct_string[-3:]
  # Add leading zero padding. For example, '1' -> '001'.
  return '0' * (3 - len(permission_bytes)) + permission_bytes


def convert_base_eight_str_to_base_ten_int(base_eight_str):
  """Takes string representing integer in octal and converts to base ten int."""
  # Example: '111' -> 73.
  return int(base_eight_str, 8)


class PosixMode:
  """Stores POSIX mode in all useful formats."""

  def __init__(self, base_ten_int, base_eight_str):
    """Initializes class. Prefer the 'from' constructors below."""
    self.base_ten_int = base_ten_int
    self.base_eight_str = base_eight_str

  @classmethod
  def from_base_ten_int(cls, base_ten_int):
    """Initializes class from base ten int. E.g. 73."""
    base_eight_str = convert_base_ten_to_base_eight_str(base_ten_int)
    # Not using original base_ten_int because str version removes unwanted bits.
    return PosixMode(
        convert_base_eight_str_to_base_ten_int(base_eight_str), base_eight_str)

  @classmethod
  def from_base_eight_str(cls, base_eight_str):
    """Initializes class from base eight (octal) string. E.g. '111'."""
    return PosixMode(
        convert_base_eight_str_to_base_ten_int(base_eight_str), base_eight_str)

  def __eq__(self, other):
    if not isinstance(other, type(self)):
      return NotImplemented
    return (self.base_ten_int == other.base_ten_int and
            self.base_eight_str == other.base_eight_str)

  def __repr__(self):
    return '(base-ten int: {}, base-eight str: {})'.format(
        self.base_ten_int, self.base_eight_str)


# Holds system-wide POSIX information.
#
# Attributes:
#   default_mode (PosixMode): The default permissions assigned to files.
#   user_groups (set): The set of Unix groups the user belongs to. Should
#     include one primary group and a variable number of secondary groups.
SystemPosixData = collections.namedtuple('SystemPosixData',
                                         ['default_mode', 'user_groups'])


def _get_default_mode():
  """Gets default permissions files are created with as PosixMode object."""
  # umask returns the permissions that should not be granted, so they must be
  # subtracted from the maximum set of permissions.
  max_permissions = 0o777
  # This call temporarily sets the process's umask to 177 while fetching the
  # the default umask. Not thread-safe. Run only in places where concurrent
  # file operations are not possible like a command surface.
  current_umask = os.umask(0o177)
  # Reset to the default umask.
  os.umask(current_umask)
  mode = max_permissions - current_umask
  # Files are not given execute privileges by default. Therefore we need to
  # subtract one from every odd permissions value. This is done via a bitmask.
  mode_without_execution = mode & 0o666
  return PosixMode.from_base_ten_int(mode_without_execution)


def _get_user_groups():
  """Gets set of POSIX groups the user is part of."""
  # POSIX modules and os.getuid not available on Windows.
  # pylint:disable=g-import-not-at-top
  import grp
  import pwd
  # pylint:enable=g-import-not-at-top
  user_id = os.getuid()
  user_name = pwd.getpwuid(user_id).pw_name
  return set(
      # Primary group.
      [pwd.getpwuid(user_id).pw_gid] +
      # Secondary groups.
      [g.gr_gid for g in grp.getgrall() if user_name in g.gr_mem])


@function_result_cache.lru(maxsize=1)
def get_system_posix_data():
  """Gets POSIX info that should only be fetched once."""
  if platforms.OperatingSystem.IsWindows():
    return SystemPosixData(None, None)

  default_mode = _get_default_mode()
  user_groups = _get_user_groups()
  return SystemPosixData(default_mode, user_groups)


def _raise_error_and_maybe_delete_file(error, delete_path):
  """Deletes file before raising error if file path provided."""
  if delete_path:
    os.remove(delete_path)
  raise error


def raise_if_invalid_file_permissions(
    system_posix_data,
    resource,
    delete_path=None,
    known_posix=None,
):
  """Detects permissions causing inaccessibility.

  Can delete invalid file.

  Args:
    system_posix_data (SystemPosixData): Helps determine if file will be made
      inaccessible in local environment.
    resource (ObjectResource): Contains URL used for messages and custom POSIX
      metadata used to determine if setting invalid file permissions.
    delete_path (str|None): If present, will delete file before raising error.
      Useful if file has been downloaded and needs to be cleaned up.
    known_posix (PosixAttributes|None): Use pre-parsed POSIX data instead of
      extracting from source. Not super important here because the source is a
      cloud object and doesn't require an `os.stat` call to harvest metadata,
      but it would be strange if we used `known_posix` for callers and only
      `resource` here, especially if the values were different (which they
      shouldn't be). Be careful using this because, if the data is wrong, it
      could mess with these safety checks.

  Raises:
    SystemPermissionError: Has explanatory message about issue.
  """
  _, _, uid, gid, mode = (
      known_posix or get_posix_attributes_from_cloud_resource(resource)
  )
  if (uid is gid is mode is None) or platforms.OperatingSystem.IsWindows():
    # If the user isn't setting anything, the system's new file defaults
    # are used, which we assume are valid.
    # Windows doesn't use POSIX for file permissions, so files will validate.
    return

  # POSIX modules, os.geteuid, and os.getuid not available on Windows.
  if os.geteuid() == 0:
    # The root user can access files regardless of their permissions.
    return

  # pylint:disable=g-import-not-at-top
  import grp
  import pwd
  # pylint:enable=g-import-not-at-top

  url_string = resource.storage_url.url_string
  if uid is not None:
    try:
      pwd.getpwuid(uid)
    except KeyError:
      error = errors.SystemPermissionError(
          _MISSING_UID_FORMAT.format(url_string, uid)
      )
      _raise_error_and_maybe_delete_file(error, delete_path)
  if gid is not None:
    try:
      grp.getgrgid(gid)
    except (KeyError, OverflowError):
      error = errors.SystemPermissionError(
          _MISSING_GID_FORMAT.format(url_string, gid)
      )
      _raise_error_and_maybe_delete_file(error, delete_path)

  if mode is None:
    mode_to_set = system_posix_data.default_mode
  else:
    mode_to_set = mode

  uid_to_set = uid or os.getuid()
  if uid is None or uid == os.getuid():
    # No UID causes system to default to current user as owner.
    # Owner permissions take priority over group and "other".
    if mode_to_set.base_ten_int & stat.S_IRUSR:
      return
    error = errors.SystemPermissionError(
        _INSUFFICIENT_USER_READ_ACCESS_FORMAT.format(
            url_string, uid_to_set, mode_to_set.base_eight_str
        )
    )
    _raise_error_and_maybe_delete_file(error, delete_path)

  if gid is None or gid in system_posix_data.user_groups:
    # No GID causes system to create file owned by user's primary group.
    # Group permissions take priority over "other" if user is member of group.
    if mode_to_set.base_ten_int & stat.S_IRGRP:
      return

    error = errors.SystemPermissionError(
        _INSUFFICIENT_GROUP_READ_ACCESS_FORMAT.format(
            url_string,
            '[user primary group]' if gid is None else gid,
            mode_to_set.base_eight_str,
        )
    )
    _raise_error_and_maybe_delete_file(error, delete_path)

  if mode_to_set.base_ten_int & stat.S_IROTH:
    # User is not owner and not in relevant group. User is "other".
    return
  error = errors.SystemPermissionError(
      _INSUFFICIENT_OTHER_READ_ACCESS_FORMAT.format(
          url_string, uid_to_set, mode_to_set.base_eight_str
      )
  )
  _raise_error_and_maybe_delete_file(error, delete_path)


# Holds custom POSIX information we may extract or apply to a file.
#
# "None" values typically mean using the system default.
#
# Attributes:
#   atime (int|None): File's access time in seconds since epoch.
#   mtime (int|None): File's modification time in seconds since epoch.
#   uid (int|None): The user ID marked as owning the file.
#   gid (int|None): The group ID marked as owning the file.
#   mode (PosixMode|None): Access permissions for the file.
PosixAttributes = collections.namedtuple(
    'PosixAttributes', ['atime', 'mtime', 'uid', 'gid', 'mode'])


def get_posix_attributes_from_file(file_path, preserve_symlinks=False):
  """Takes file path and returns PosixAttributes object."""
  follow_symlinks = (
      not preserve_symlinks or os.stat not in os.supports_follow_symlinks
  )
  mode, _, _, _, uid, gid, _, atime, mtime, _ = os.stat(
      file_path, follow_symlinks=follow_symlinks
  )
  return PosixAttributes(atime, mtime, uid, gid,
                         PosixMode.from_base_ten_int(mode))


def set_posix_attributes_on_file_if_valid(
    system_posix_data,
    source_resource,
    destination_resource,
    known_source_posix=None,
    known_destination_posix=None,
    preserve_symlinks=False,
):
  """Sets custom POSIX attributes on file if the final metadata will be valid.

  This function is typically called after downloads.
  `raise_if_invalid_file_permissions` should have been called before initiating
  a download, but we call it again here to be safe.

  Args:
    system_posix_data (SystemPosixData): System-wide POSIX. Helps fill in
      missing data and determine validity of result.
    source_resource (resource_reference.ObjectResource): Source resource with
      POSIX attributes to apply.
    destination_resource (resource_reference.FileObjectResource): Destination
      resource to apply POSIX attributes to.
    known_source_posix (PosixAttributes|None): Use pre-parsed POSIX data instead
      of extracting from source.
    known_destination_posix (PosixAttributes|None): Use pre-parsed POSIX data
      instead of extracting from destination.
    preserve_symlinks (bool): Whether symlinks should be preserved rather than
      followed.

  Raises:
    SystemPermissionError: Custom metadata asked for file ownership change that
      user did not have permission to perform. Other permission errors from
      OS functions are possible. Also see `raise_if_invalid_file_permissions`.
  """
  destination_path = destination_resource.storage_url.resource_name
  raise_if_invalid_file_permissions(
      system_posix_data,
      source_resource,
      destination_path,
      known_posix=known_source_posix,
  )

  custom_posix_attributes = (
      known_source_posix
      or get_posix_attributes_from_cloud_resource(source_resource)
  )
  existing_posix_attributes = (
      known_destination_posix
      or get_posix_attributes_from_file(destination_path, preserve_symlinks)
  )

  if custom_posix_attributes.atime is None:
    atime = existing_posix_attributes.atime
    need_utime_call = False
  else:
    atime = custom_posix_attributes.atime
    need_utime_call = (
        custom_posix_attributes.atime != existing_posix_attributes.atime
    )
  if custom_posix_attributes.mtime is None:
    mtime = existing_posix_attributes.mtime
  else:
    mtime = custom_posix_attributes.mtime
    need_utime_call = (
        need_utime_call
        or custom_posix_attributes.mtime != existing_posix_attributes.mtime
    )

  # Don't follow symlinks if they're being preserved.
  if need_utime_call:
    follow_symlinks = (
        not preserve_symlinks or os.utime not in os.supports_follow_symlinks
    )
    os.utime(destination_path, (atime, mtime), follow_symlinks=follow_symlinks)

  if platforms.OperatingSystem.IsWindows():
    # Windows does not use the remaining POSIX attributes.
    return

  if custom_posix_attributes.uid is None:
    # Allow only valid UIDs.
    uid = existing_posix_attributes.uid
    need_chown_call = False
  else:
    uid = custom_posix_attributes.uid
    need_chown_call = (
        custom_posix_attributes.uid != existing_posix_attributes.uid
    )

    if uid != existing_posix_attributes.uid and os.geteuid() != 0:
      # Clean up file we can't set proper metadata on.
      os.remove(destination_path)
      # Custom may equal existing if user is uploading and downloading on the
      # same machine and account.
      raise errors.SystemPermissionError(
          'Root permissions required to set UID {}.'.format(uid)
      )

  if custom_posix_attributes.gid is None:
    gid = existing_posix_attributes.gid
  else:
    gid = custom_posix_attributes.gid
    need_chown_call = (
        need_chown_call
        or custom_posix_attributes.gid != existing_posix_attributes.gid
    )

  if need_chown_call:
    # Note: chown doesn't do anything for negative numbers like _INVALID_ID.
    follow_symlinks = (
        not preserve_symlinks or os.chown not in os.supports_follow_symlinks
    )
    os.chown(destination_path, uid, gid, follow_symlinks=follow_symlinks)

  if custom_posix_attributes.mode is not None and (
      custom_posix_attributes.mode.base_ten_int
      != existing_posix_attributes.mode.base_ten_int
  ):
    follow_symlinks = (
        not preserve_symlinks or os.chmod not in os.supports_follow_symlinks
    )
    os.chmod(
        destination_path,
        custom_posix_attributes.mode.base_ten_int,
        follow_symlinks=follow_symlinks,
    )


def _extract_time_from_custom_metadata(resource, key):
  """Finds, validates, and returns a POSIX time value."""
  if not resource.custom_fields or resource.custom_fields.get(key) is None:
    return None
  try:
    timestamp = int(resource.custom_fields[key])
  except ValueError:
    log.warning(
        '{} metadata did not contain a numeric value for {}: {}'.format(
            resource.storage_url.url_string, key, resource.custom_fields[key]
        )
    )
    return None
  if timestamp < 0:
    log.warning(
        'Found negative time value in {} metadata {}: {}'.format(
            resource.storage_url.url_string, key, resource.custom_fields[key]
        )
    )
    return None
  if (
      timestamp
      > datetime.datetime.now(datetime.timezone.utc).timestamp()
      + _SECONDS_PER_DAY
  ):
    log.warning(
        'Found {} value in {} metadata that is more than one day in the'
        ' future from the system time: {}'.format(
            key, resource.storage_url.url_string, resource.custom_fields[key]
        )
    )
    return None
  return timestamp


def _extract_id_from_custom_metadata(resource, key):
  """Finds, validates, and returns a POSIX ID value."""
  if not resource.custom_fields or resource.custom_fields.get(key) is None:
    return None
  try:
    posix_id = int(resource.custom_fields[key])
  except ValueError:
    log.warning(
        '{} metadata did not contain a numeric value for {}: {}'.format(
            resource.storage_url.url_string, key, resource.custom_fields[key]
        )
    )
    return None
  if posix_id < 0:
    log.warning(
        'Found negative ID value in {} metadata {}: {}'.format(
            resource.storage_url.url_string, key, resource.custom_fields[key]
        )
    )
    return None
  return posix_id


def _extract_mode_from_custom_metadata(resource):
  """Finds, validates, and returns a POSIX mode value."""
  if (
      not resource.custom_fields
      or resource.custom_fields.get(MODE_METADATA_KEY) is None
  ):
    return None
  try:
    return PosixMode.from_base_eight_str(
        resource.custom_fields[MODE_METADATA_KEY]
    )
  except ValueError:
    log.warning(
        '{} metadata did not contain a valid permissions octal string'
        ' for {}: {}'.format(
            resource.storage_url.url_string,
            MODE_METADATA_KEY,
            resource.custom_fields[MODE_METADATA_KEY],
        )
    )
  return None


def get_posix_attributes_from_cloud_resource(resource):
  """Parses metadata_dict and returns PosixAttributes.

  Note: This parses an object's *custom* metadata with user-set fields,
  not the full metadata with provider-set fields.

  Args:
    resource (ObjectResource): Contains URL to include in logged warnings and
      custom metadata to parse.

  Returns:
    PosixAttributes object populated from metadata_dict.
  """
  atime = _extract_time_from_custom_metadata(resource, ATIME_METADATA_KEY)
  mtime = _extract_time_from_custom_metadata(resource, MTIME_METADATA_KEY)
  uid = _extract_id_from_custom_metadata(resource, UID_METADATA_KEY)
  gid = _extract_id_from_custom_metadata(resource, GID_METADATA_KEY)
  mode = _extract_mode_from_custom_metadata(resource)
  return PosixAttributes(atime, mtime, uid, gid, mode)


def get_posix_attributes_from_resource(resource, preserve_symlinks=False):
  """Parses unknown resource type for POSIX data."""
  if isinstance(resource, resource_reference.ObjectResource):
    return get_posix_attributes_from_cloud_resource(resource)
  if isinstance(resource, resource_reference.FileObjectResource):
    return get_posix_attributes_from_file(
        resource.storage_url.resource_name, preserve_symlinks
    )
  raise errors.InvalidUrlError(
      'Can only retrieve POSIX attributes from file or cloud'
      ' object, not: {}'.format(resource.TYPE_STRING)
  )


def update_custom_metadata_dict_with_posix_attributes(metadata_dict,
                                                      posix_attributes):
  """Updates custom metadata_dict with PosixAttributes data."""
  if posix_attributes.atime is not None:
    metadata_dict[ATIME_METADATA_KEY] = str(posix_attributes.atime)
  if posix_attributes.mtime is not None:
    metadata_dict[MTIME_METADATA_KEY] = str(posix_attributes.mtime)
  if posix_attributes.uid is not None:
    metadata_dict[UID_METADATA_KEY] = str(posix_attributes.uid)
  if posix_attributes.gid is not None:
    metadata_dict[GID_METADATA_KEY] = str(posix_attributes.gid)
  if posix_attributes.mode is not None:
    metadata_dict[MODE_METADATA_KEY] = posix_attributes.mode.base_eight_str


def raise_if_source_and_destination_not_valid_for_preserve_posix(
    source_url, destination_url
):
  """Logs errors and returns bool indicating if transfer is valid for POSIX."""
  if isinstance(source_url, storage_url.FileUrl) and source_url.is_stream:
    raise errors.InvalidUrlError(
        'Cannot preserve POSIX data from pipe: {}'.format(source_url)
    )
  if isinstance(destination_url,
                storage_url.FileUrl) and destination_url.is_stream:
    raise errors.InvalidUrlError(
        'Cannot write POSIX data to pipe: {}'.format(destination_url)
    )
  if isinstance(source_url, storage_url.CloudUrl) and isinstance(
      destination_url, storage_url.CloudUrl):
    raise errors.InvalidUrlError(
        'Cannot preserve POSIX data for cloud-to-cloud copies'
    )


def run_if_setting_posix(
    posix_to_set, user_request_args, function, *args, **kwargs
):
  """Useful for gating functions without repeating the below if statement."""
  if posix_to_set or (user_request_args and user_request_args.preserve_posix):
    return function(*args, **kwargs)
  return None