HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/calliope/command_loading.py
# -*- coding: utf-8 -*- #
# Copyright 2017 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers to load commands from the filesystem."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import abc
import importlib
import os
import re

from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import command_release_tracks
from googlecloudsdk.core import exceptions
from googlecloudsdk.core.util import pkg_resources
from ruamel import yaml
import six

PARTIALS_ATTRIBUTE = '_PARTIALS_'
PARTIALS_DIR = '_partials'


class CommandLoadFailure(Exception):
  """An exception for when a command or group module cannot be imported."""

  def __init__(self, command, root_exception):
    self.command = command
    self.root_exception = root_exception
    super(CommandLoadFailure, self).__init__(
        'Problem loading {command}: {issue}.'.format(
            command=command, issue=six.text_type(root_exception)
        )
    )


class LayoutException(Exception):
  """An exception for when a command or group .py file has the wrong types."""


class ReleaseTrackNotImplementedException(Exception):
  """An exception for when a command or group does not support a release track."""


class YamlCommandTranslator(six.with_metaclass(abc.ABCMeta, object)):
  """An interface to implement when registering a custom command loader."""

  @abc.abstractmethod
  def Translate(self, path, command_data):
    """Translates a yaml command into a calliope command.

    Args:
      path: [str], A list of group names that got us down to this command group
        with respect to the CLI itself.  This path should be used for things
        like error reporting when a specific element in the tree needs to be
        referenced.
      command_data: dict, The parsed contents of the command spec from the yaml
        file that corresponds to the release track being loaded.

    Returns:
      calliope.base.Command, A command class (not instance) that
      implements the spec.
    """
    pass


def FindSubElements(impl_paths, path):
  """Find all the sub groups and commands under this group.

  Args:
    impl_paths: [str], A list of file paths to the command implementation for
      this group.
    path: [str], A list of group names that got us down to this command group
      with respect to the CLI itself.  This path should be used for things like
      error reporting when a specific element in the tree needs to be
      referenced.

  Raises:
    CommandLoadFailure: If the command is invalid and cannot be loaded.
    LayoutException: if there is a command or group with an illegal name.

  Returns:
    ({str: [str]}, {str: [str]), A tuple of groups and commands found where each
    item is a mapping from name to a list of paths that implement that command
    or group. There can be multiple paths because a command or group could be
    implemented in both python and yaml (for different release tracks).
  """
  if len(impl_paths) > 1:
    raise CommandLoadFailure(
        '.'.join(path),
        Exception('Command groups cannot be implemented in yaml'),
    )
  impl_path = impl_paths[0]
  groups, commands = pkg_resources.ListPackage(
      impl_path, extra_extensions=['.yaml']
  )
  return (
      _GenerateElementInfo(impl_path, groups),
      _GenerateElementInfo(impl_path, commands),
  )


def _GenerateElementInfo(impl_path, names):
  """Generates the data a group needs to load sub elements.

  Args:
    impl_path: The file path to the command implementation for this group.
    names: [str], The names of the sub groups or commands found in the group.

  Raises:
    LayoutException: if there is a command or group with an illegal name.

  Returns:
    {str: [str], A mapping from name to a list of paths that implement that
    command or group. There can be multiple paths because a command or group
    could be implemented in both python and yaml (for different release tracks).
  """
  elements = {}
  for name in names:
    if re.search('[A-Z]', name):
      raise LayoutException(
          'Commands and groups cannot have capital letters: {0}.'.format(name)
      )
    cli_name = name[:-5] if name.endswith('.yaml') else name
    sub_path = os.path.join(impl_path, name)

    existing = elements.setdefault(cli_name, [])
    existing.append(sub_path)
  return elements


def LoadCommonType(
    impl_paths,
    path,
    release_track,
    construction_id,
    is_command,
    yaml_command_translator=None,
):
  """Loads a calliope command or group from a file.

  Args:
    impl_paths: [str], A list of file paths to the command implementation for
      this group or command.
    path: [str], A list of group names that got us down to this command group
      with respect to the CLI itself.  This path should be used for things like
      error reporting when a specific element in the tree needs to be
      referenced.
    release_track: ReleaseTrack, The release track that we should load.
    construction_id: str, A unique identifier for the CLILoader that is being
      constructed.
    is_command: bool, True if we are loading a command, False to load a group.
    yaml_command_translator: YamlCommandTranslator, An instance of a translator
      to use to load the yaml data.

  Raises:
    CommandLoadFailure: If the command is invalid and cannot be loaded.

  Returns:
    The base._Common class for the command or group.
  """
  implementations = _GetAllImplementations(
      impl_paths, path, construction_id, is_command, yaml_command_translator
  )
  return _ExtractReleaseTrackImplementation(
      impl_paths[0], release_track, implementations
  )()


def Cache(func):
  cached_results = {}

  def ReturnCachedOrCallFunc(*args):
    try:
      return cached_results[args]
    except KeyError:
      result = func(*args)
      cached_results[args] = result
      return result

  return ReturnCachedOrCallFunc


@Cache
def _SafeLoadYamlFile(path):
  return yaml.safe_load(pkg_resources.GetResourceFromFile(path))


@Cache
def _CustomLoadYamlFile(path):
  return CreateYamlLoader(path).load(pkg_resources.GetResourceFromFile(path))


def _GetAllImplementations(
    impl_paths, path, construction_id, is_command, yaml_command_translator
):
  """Gets all the release track command implementations.

  Can load both python and yaml modules.

  Args:
    impl_paths: [str], A list of file paths to the command implementation for
      this group or command.
    path: [str], A list of group names that got us down to this command group
      with respect to the CLI itself.  This path should be used for things like
      error reporting when a specific element in the tree needs to be
      referenced.
    construction_id: str, A unique identifier for the CLILoader that is being
      constructed.
    is_command: bool, True if we are loading a command, False to load a group.
    yaml_command_translator: YamlCommandTranslator, An instance of a translator
      to use to load the yaml data.

  Raises:
    CommandLoadFailure: If the command is invalid and cannot be loaded.

  Returns:
    [(func->base._Common, [base.ReleaseTrack])], A list of tuples that can be
    passed to _ExtractReleaseTrackImplementation. Each item in this list
    represents a command implementation. The first element is a function that
    returns the implementation, and the second element is a list of release
    tracks it is valid for.
  """
  implementations = []
  for impl_file in impl_paths:
    if impl_file.endswith('.yaml'):
      if not is_command:
        raise CommandLoadFailure(
            '.'.join(path),
            Exception('Command groups cannot be implemented in yaml'),
        )
      if _IsCommandWithPartials(impl_file, path):
        data = _LoadCommandWithPartials(impl_file, path)
      else:
        data = _CustomLoadYamlFile(impl_file)
      implementations.extend(
          (_ImplementationsFromYaml(path, data, yaml_command_translator))
      )
    else:
      module = _GetModuleFromPath(impl_file, path, construction_id)
      implementations.extend(
          _ImplementationsFromModule(
              module.__file__,
              list(module.__dict__.values()),
              is_command=is_command,
          )
      )
  return implementations


def _IsCommandWithPartials(impl_file, path):
  """Checks if the YAML file is a command with partials.

  Args:
    impl_file: file path to the main YAML command implementation.
    path: [str], A list of group names that got us down to this command group
      with respect to the CLI itself.  This path should be used for things like
      error reporting when a specific element in the tree needs to be
      referenced.

  Raises:
    CommandLoadFailure: If the command is invalid and should not be loaded.

  Returns:
    Whether or not it is a valid command with partials to load.
  """
  found_partial_token = False
  with pkg_resources.GetFileTextReaderByLine(impl_file) as file:
    for line in file:
      line = line.strip()
      if not line or line.startswith('#'):
        continue
      if line == f'{PARTIALS_ATTRIBUTE}: true':
        found_partial_token = True
      elif found_partial_token:
        raise CommandLoadFailure(
            '.'.join(path),
            Exception(
                f'Command with {PARTIALS_ATTRIBUTE} attribute cannot have'
                ' extra content'
            ),
        )
      else:
        break

  return found_partial_token


def _LoadCommandWithPartials(impl_file, path):
  """Loads all YAML partials for a command with partials based on conventions.

  Partial files are loaded using _CustomLoadYamlFile as normal YAML commands.

  Conventions:
  - Partials should be placed in subfolder `_partials`.
  - File names of partials should match the main command name and follow this
  format: _[command_name]_[version|release_track].yaml
  - Release tracks should not be duplicatd across all partials.

  Args:
    impl_file: file path to the main YAML command implementation.
    path: [str], A list of group names that got us down to this command group
      with respect to the CLI itself.  This path should be used for things like
      error reporting when a specific element in the tree needs to be
      referenced.

  Returns:
    List with data loaded from partial YAML files for the main command.
  """
  file_name = os.path.basename(impl_file)
  command_name = file_name[:-5]  # strip .yaml
  partials_dir = os.path.join(os.path.dirname(impl_file), PARTIALS_DIR)
  partial_files = pkg_resources.GetFilesFromDirectory(
      partials_dir, f'_{command_name}_*.yaml'
  )

  command_data_list = []
  command_path = re.escape(os.path.join(partials_dir, f'_{command_name}'))
  for partial_file in partial_files:
    if re.match(fr'{command_path}_(alpha|beta|ga)\.yaml', partial_file):
      command_data_list.extend(_CustomLoadYamlFile(partial_file))

  _ValidateCommandWithPartials(command_data_list, path)
  return command_data_list


def _ValidateCommandWithPartials(command_data_list, path):
  """Validates that the command with partials do not have duplicated tracks.

  Args:
    command_data_list: List with data loaded from all YAML partials.
    path: [str], A list of group names that got us down to this command group
      with respect to the CLI itself.  This path should be used for things like
      error reporting when a specific element in the tree needs to be
      referenced.

  Raises:
    CommandLoadFailure: If the command is invalid and should not be loaded.
  """
  release_tracks = set()
  for command_data in command_data_list:
    for release_track in command_data['release_tracks']:
      if release_track in release_tracks:
        raise CommandLoadFailure(
            '.'.join(path),
            Exception(
                'Command with partials cannot have duplicated release tracks.'
                f' Found multiple [{release_track}s]'
            ),
        )
      else:
        release_tracks.add(release_track)


def CreateYamlLoader(impl_path):
  """Creates a custom yaml loader that handles includes from common data.

  Args:
    impl_path: str, The path to the file we are loading data from.

  Returns:
    yaml.Loader, A yaml loader to use.
  """
  common_file_path = os.path.join(os.path.dirname(impl_path), '__init__.yaml')
  common_data = None
  try:
    common_data = _SafeLoadYamlFile(common_file_path)
  except IOError:
    pass

  class Constructor(yaml.Constructor):
    """A custom yaml constructor.

    It adds 2 different import capabilities. Assuming __init__.yaml has the
    contents:

    foo:
      a: b
      c: d

    baz:
      - e: f
      - g: h

    The first uses a custom constructor to insert data into your current file,
    so:

    bar: !COMMON foo.a

    results in:

    bar: b

    The second mechanism overrides construct_mapping and construct_sequence to
    post process the data and replace the merge macro with keys from the other
    file. We can't use the custom constructor for this as well because the
    merge key type in yaml is processed before custom constructors which makes
    importing and merging not possible. So:

    bar:
      _COMMON_: foo
      i: j

    results in:

    bar:
      a: b
      c: d
      i: j

    This can also be used to merge list contexts, so:

    bar:
      - _COMMON_baz
      - i: j

    results in:

    bar:
      - e: f
      - g: h
      - i: j

    You may also use the !REF and _REF_ directives in the same way. Instead of
    pulling from the common file, they can pull from an arbitrary yaml file
    somewhere in the googlecloudsdk tree. The syntax looks like:

    bar: !REF googlecloudsdk.foo.bar:a.b.c

    This will load googlecloudsdk/foo/bar.yaml and from that file return the
    a.b.c nested attribute.
    """

    INCLUDE_COMMON_MACRO = '!COMMON'
    MERGE_COMMON_MACRO = '_COMMON_'
    INCLUDE_REF_MACRO = '!REF'
    MERGE_REF_MACRO = '_REF_'

    def construct_mapping(self, *args, **kwargs):
      data = super(Constructor, self).construct_mapping(*args, **kwargs)
      data = self._ConstructMappingHelper(
          Constructor.MERGE_COMMON_MACRO, self._GetCommonData, data
      )
      return self._ConstructMappingHelper(
          Constructor.MERGE_REF_MACRO, self._GetRefData, data
      )

    def _ConstructMappingHelper(self, macro, source_func, data):
      attribute_path = data.pop(macro, None)
      if not attribute_path:
        return data

      modified_data = {}
      for path in attribute_path.split(','):
        modified_data.update(source_func(path))
      # Add the explicit data last so it can override the imports.
      modified_data.update(data)
      return modified_data

    def construct_sequence(self, *args, **kwargs):
      data = super(Constructor, self).construct_sequence(*args, **kwargs)
      data = self._ConstructSequenceHelper(
          Constructor.MERGE_COMMON_MACRO, self._GetCommonData, data
      )
      return self._ConstructSequenceHelper(
          Constructor.MERGE_REF_MACRO, self._GetRefData, data
      )

    def _ConstructSequenceHelper(self, macro, source_func, data):
      new_list = []
      for i in data:
        if isinstance(i, six.string_types) and i.startswith(macro):
          attribute_path = i[len(macro) :]
          for path in attribute_path.split(','):
            new_list.extend(source_func(path))
        else:
          new_list.append(i)
      return new_list

    def IncludeCommon(self, node):
      attribute_path = self.construct_scalar(node)
      return self._GetCommonData(attribute_path)

    def IncludeRef(self, node):
      attribute_path = self.construct_scalar(node)
      return self._GetRefData(attribute_path)

    def _GetCommonData(self, attribute_path):
      if not common_data:
        raise LayoutException(
            'Command [{}] references [common command] data but it does not '
            'exist.'.format(impl_path)
        )
      return self._GetAttribute(common_data, attribute_path, 'common command')

    def _GetRefData(self, path):
      """Loads the YAML data from the given reference.

      A YAML reference must refer to a YAML file and an attribute within that
      file to extract.

      Args:
        path: str, The path of the YAML file to import. It must be in the form
          of package.module:attribute.attribute, where the module path is
          separated from the sub attributes within the YAML by a ':'.

      Raises:
        LayoutException: If the given module or attribute cannot be loaded.

      Returns:
        The referenced YAML data.
      """
      parts = path.split(':')
      if len(parts) != 2:
        raise LayoutException(
            'Invalid Yaml reference: [{}]. References must be in the format: '
            'path(.path)+:attribute(.attribute)*'.format(path)
        )
      path_segments = parts[0].split('.')
      try:
        root_module = importlib.import_module(path_segments[0])
        yaml_path = (
            os.path.join(
                os.path.dirname(root_module.__file__), *path_segments[1:]
            )
            + '.yaml'
        )
        data = _SafeLoadYamlFile(yaml_path)
      except (ImportError, IOError) as e:
        raise LayoutException(
            'Failed to load Yaml reference file [{}]: {}'.format(parts[0], e)
        )

      return self._GetAttribute(data, parts[1], yaml_path)

    def _GetAttribute(self, data, attribute_path, location):
      value = data
      for attribute in attribute_path.split('.'):
        value = value.get(attribute, None)
        if not value:
          raise LayoutException(
              'Command [{}] references [{}] data attribute [{}] in '
              'path [{}] but it does not exist.'.format(
                  impl_path, location, attribute, attribute_path
              )
          )
      return value

  loader = yaml.YAML()
  loader.Constructor = Constructor
  loader.constructor.add_constructor(
      Constructor.INCLUDE_COMMON_MACRO, Constructor.IncludeCommon
  )
  loader.constructor.add_constructor(
      Constructor.INCLUDE_REF_MACRO, Constructor.IncludeRef
  )
  return loader


def _GetModuleFromPath(impl_file, path, construction_id):
  """Import the module and dig into it to return the namespace we are after.

  Import the module relative to the top level directory.  Then return the
  actual module corresponding to the last bit of the path.

  Args:
    impl_file: str, The path to the file this was loaded from (for error
      reporting).
    path: [str], A list of group names that got us down to this command group
      with respect to the CLI itself.  This path should be used for things like
      error reporting when a specific element in the tree needs to be
      referenced.
    construction_id: str, A unique identifier for the CLILoader that is being
      constructed.

  Returns:
    The imported module.
  """
  # Make sure this module name never collides with any real module name.
  # Use the CLI naming path, so values are always unique.
  name_to_give = '__calliope__command__.{construction_id}.{name}'.format(
      construction_id=construction_id, name='.'.join(path).replace('-', '_')
  )
  try:
    return pkg_resources.GetModuleFromPath(name_to_give, impl_file)
  # pylint:disable=broad-except, We really do want to catch everything here,
  # because if any exceptions make it through for any single command or group
  # file, the whole CLI will not work. Instead, just log whatever it is.
  except Exception as e:
    exceptions.reraise(CommandLoadFailure('.'.join(path), e))


def _ImplementationsFromModule(mod_file, module_attributes, is_command):
  """Gets all the release track command implementations from the module.

  Args:
    mod_file: str, The __file__ attribute of the module resulting from importing
      the file containing a command.
    module_attributes: The __dict__.values() of the module.
    is_command: bool, True if we are loading a command, False to load a group.

  Raises:
    LayoutException: If there is not exactly one type inheriting CommonBase.

  Returns:
    [(func->base._Common, [base.ReleaseTrack])], A list of tuples that can be
    passed to _ExtractReleaseTrackImplementation. Each item in this list
    represents a command implementation. The first element is a function that
    returns the implementation, and the second element is a list of release
    tracks it is valid for.
  """
  commands = []
  groups = []

  # Collect all the registered groups and commands.
  for command_or_group in module_attributes:
    if getattr(command_or_group, 'IS_COMMAND', False):
      commands.append(command_or_group)
    elif getattr(command_or_group, 'IS_COMMAND_GROUP', False):
      groups.append(command_or_group)

  if is_command:
    if groups:
      # Ensure that there are no groups if we are expecting a command.
      raise LayoutException(
          'You cannot define groups [{0}] in a command file: [{1}]'.format(
              ', '.join([g.__name__ for g in groups]), mod_file
          )
      )
    if not commands:
      # Make sure we found a command.
      raise LayoutException(
          'No commands defined in file: [{0}]'.format(mod_file)
      )
    commands_or_groups = commands
  else:
    # Ensure that there are no commands if we are expecting a group.
    if commands:
      raise LayoutException(
          'You cannot define commands [{0}] in a command group file: [{1}]'
          .format(', '.join([c.__name__ for c in commands]), mod_file)
      )
    if not groups:
      # Make sure we found a group.
      raise LayoutException(
          'No command groups defined in file: [{0}]'.format(mod_file)
      )
    commands_or_groups = groups

  # pylint:disable=undefined-loop-variable, Linter is just wrong here.
  # We need to use a default param on the lambda so that it captures the value
  # of the variable at the time in the loop or else the closure will just have
  # the last value that was iterated on.
  return [(lambda c=c: c, c.ValidReleaseTracks()) for c in commands_or_groups]


def _ImplementationsFromYaml(path, data, yaml_command_translator):
  """Gets all the release track command implementations from the yaml file.

  Args:
    path: [str], A list of group names that got us down to this command group
      with respect to the CLI itself.  This path should be used for things like
      error reporting when a specific element in the tree needs to be
      referenced.
    data: dict, The loaded yaml data.
    yaml_command_translator: YamlCommandTranslator, An instance of a translator
      to use to load the yaml data.

  Raises:
    CommandLoadFailure: If the command is invalid and cannot be loaded.

  Returns:
    [(func->base._Common, [base.ReleaseTrack])], A list of tuples that can be
    passed to _ExtractReleaseTrackImplementation. Each item in this list
    represents a command implementation. The first element is a function that
    returns the implementation, and the second element is a list of release
    tracks it is valid for.
  """
  if not yaml_command_translator:
    raise CommandLoadFailure(
        '.'.join(path),
        Exception('No yaml command translator has been registered'),
    )

  # pylint:disable=undefined-loop-variable, Linter is just wrong here.
  # We need to use a default param on the lambda so that it captures the value
  # of the variable at the time in the loop or else the closure will just have
  # the last value that was iterated on.
  implementations = [
      (
          lambda i=i: yaml_command_translator.Translate(path, i),
          {base.ReleaseTrack.FromId(t) for t in i.get('release_tracks', [])},
      )
      for i in command_release_tracks.SeparateDeclarativeCommandTracks(data)
  ]
  return implementations


def _ExtractReleaseTrackImplementation(
    impl_file, expected_track, implementations
):
  """Validates and extracts the correct implementation of the command or group.

  Args:
    impl_file: str, The path to the file this was loaded from (for error
      reporting).
    expected_track: base.ReleaseTrack, The release track we are trying to load.
    implementations: [(func->base._Common, [base.ReleaseTrack])], A list of
      tuples where each item in this list represents a command implementation.
      The first element is a function that returns the implementation, and the
      second element is a list of release tracks it is valid for.

  Raises:
    LayoutException: If there is not exactly one type inheriting
        CommonBase.
    ReleaseTrackNotImplementedException: If there is no command or group
      implementation for the request release track.

  Returns:
    object, The single implementation that matches the expected release track.
  """
  # We found a single thing, if it's valid for this track, return it.
  if len(implementations) == 1:
    impl, valid_tracks = implementations[0]
    # If there is a single thing defined, and it does not declare any valid
    # tracks, just assume it is enabled for all tracks that it's parent is.
    if not valid_tracks or expected_track in valid_tracks:
      return impl
    raise ReleaseTrackNotImplementedException(
        'No implementation for release track [{0}] for element: [{1}]'.format(
            expected_track.id, impl_file
        )
    )

  # There was more than one thing found, make sure there are no conflicts.
  implemented_release_tracks = set()
  for impl, valid_tracks in implementations:
    # When there are multiple definitions, they need to explicitly register
    # their track to keep things sane.
    if not valid_tracks:
      raise LayoutException(
          'Multiple implementations defined for element: [{0}]. Each must '
          'explicitly declare valid release tracks.'.format(impl_file)
      )
    # Make sure no two classes define the same track.
    duplicates = implemented_release_tracks & valid_tracks
    if duplicates:
      raise LayoutException(
          'Multiple definitions for release tracks [{0}] for element: [{1}]'
          .format(', '.join([six.text_type(d) for d in duplicates]), impl_file)
      )
    implemented_release_tracks |= valid_tracks

  valid_commands_or_groups = [
      impl
      for impl, valid_tracks in implementations
      if expected_track in valid_tracks
  ]
  # We know there is at most 1 because of the above check.
  if len(valid_commands_or_groups) != 1:
    raise ReleaseTrackNotImplementedException(
        'No implementation for release track [{0}] for element: [{1}]'.format(
            expected_track.id, impl_file
        )
    )

  return valid_commands_or_groups[0]