HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/composer/image_versions_util.py
# -*- coding: utf-8 -*- #
# Copyright 2018 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common utility functions for Image Version validation."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import collections
import re

from googlecloudsdk.api_lib.composer import environments_util as environments_api_util
from googlecloudsdk.api_lib.composer import image_versions_util as image_version_api_util
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.composer import flags
from googlecloudsdk.command_lib.composer import util as command_util
from googlecloudsdk.core.util import semver

# Envs must be running at least this version of Composer to be upgradeable.
MIN_UPGRADEABLE_COMPOSER_VER = '1.0.0'

# Required for version comparisons
COMPOSER_LATEST_VERSION_PLACEHOLDER = '2.1.12'

UpgradeValidator = collections.namedtuple('UpgradeValidator',
                                          ['upgrade_valid', 'error'])

# Major version that is used as a replacement for the 'latest' alias.
COMPOSER_LATEST_MAJOR_VERSION = 2


class InvalidImageVersionError(command_util.Error):
  """Class for errors raised when an invalid image version is encountered."""


class _ImageVersionItem(object):
  """Class used to dissect and analyze image version components and strings."""

  def __init__(self, image_ver=None, composer_ver=None, airflow_ver=None):
    image_version_regex = r'^composer-(\d+(?:(?:\.\d+\.\d+(?:-[a-z]+\.\d+)?)?)?|latest)-airflow-(\d+(?:\.\d+(?:\.\d+)?)?)'
    composer_version_alias_regex = r'^(\d+|latest)$'
    airflow_version_alias_regex = r'^(\d+|\d+\.\d+)$'

    if image_ver is not None:
      iv_parts = re.findall(image_version_regex, image_ver)[0]
      self.composer_ver = iv_parts[0]
      self.airflow_ver = iv_parts[1]

    if composer_ver is not None:
      self.composer_ver = composer_ver

    if airflow_ver is not None:
      self.airflow_ver = airflow_ver

    # Determines the state of aliases
    self.composer_contains_alias = re.match(composer_version_alias_regex,
                                            self.composer_ver)
    self.airflow_contains_alias = re.match(airflow_version_alias_regex,
                                           self.airflow_ver)

  def GetImageVersionString(self):
    return 'composer-{}-airflow-{}'.format(self.composer_ver, self.airflow_ver)


def ListImageVersionUpgrades(env_ref, release_track=base.ReleaseTrack.GA):
  """List of available image version upgrades for provided env_ref."""
  env_details = environments_api_util.Get(env_ref, release_track)
  proj_location_ref = env_ref.Parent()
  cur_image_version_id = env_details.config.softwareConfig.imageVersion
  cur_python_version = env_details.config.softwareConfig.pythonVersion

  return _BuildUpgradeCandidateList(proj_location_ref, cur_image_version_id,
                                    cur_python_version, release_track)


def IsValidImageVersionUpgrade(cur_image_version_str,
                               image_version_id):
  """Checks if image version candidate is a valid upgrade for environment."""

  # Checks for the use of an alias and confirms that a valid airflow upgrade has
  # been requested.
  cur_image_ver = _ImageVersionItem(
      image_ver=cur_image_version_str)

  is_composer3 = IsVersionComposer3Compatible(
      cur_image_version_str
  )
  if not is_composer3 and not (
      CompareVersions(MIN_UPGRADEABLE_COMPOSER_VER, cur_image_ver.composer_ver)
      <= 0
  ):
    raise InvalidImageVersionError(
        'This environment does not support upgrades.')
  return _ValidateCandidateImageVersionId(
      cur_image_version_str,
      image_version_id)


def ImageVersionFromAirflowVersion(new_airflow_version, cur_image_version=None):
  """Converts airflow-version string into a image-version string."""

  is_composer3 = cur_image_version and IsVersionComposer3Compatible(
      cur_image_version
  )
  composer_ver = (
      _ImageVersionItem(cur_image_version).composer_ver
      if is_composer3
      else 'latest'
  )

  return _ImageVersionItem(
      composer_ver=composer_ver,
      airflow_ver=new_airflow_version).GetImageVersionString()


def IsImageVersionStringComposerV1(image_version):
  """Checks if string composer-X.Y.Z-airflow-A.B.C is Composer v1 version."""
  return image_version is not None and (
      image_version.startswith('composer-1.')
      or image_version.startswith('composer-1-')
  )


def IsDefaultImageVersion(image_version):
  return image_version is None or image_version.startswith('composer-latest')


def BuildDefaultComposerVersionWarning(image_version, airflow_version):
  """Builds warning message about using default Composer version."""
  message = (
      '{} resolves to Cloud Composer current default version, which is'
      ' presently Composer 2 and is subject to'
      ' further changes in the future. Consider using'
      ' --image-version=composer-A-airflow-X[.Y[.Z]]. More info at'
      ' https://cloud.google.com/composer/docs/concepts/versioning/composer-versioning-overview#version-aliases'
  )
  if airflow_version:
    return message.format('Using --airflow-version=X[.Y[.Z]]')
  if image_version:
    return message.format(
        'Using --image-version=composer-latest-airflow-X[.Y[.Z]]'
    )
  return message.format('Not defining --image-version')


def _CompareVersions(v1, v2):
  """Compares versions."""
  if v1 == v2:
    return 0
  elif v1 > v2:
    return 1
  else:
    return -1


def CompareLooseVersions(v1, v2):
  """Compares loose version strings.

  Args:
    v1: first loose version string
    v2: second loose version string

  Returns:
    Value == 1 when v1 is greater; Value == -1 when v2 is greater; otherwise 0.
  """
  v1, v2 = _VersionStrToLooseVersion(v1), _VersionStrToLooseVersion(v2)
  return _CompareVersions(v1, v2)


def CompareVersions(v1, v2):
  """Compares semantic version strings.

  Args:
    v1: first semantic version string
    v2: second semantic version string

  Returns:
    Value == 1 when v1 is greater; Value == -1 when v2 is greater; otherwise 0.
  """
  v1, v2 = _VersionStrToSemanticVersion(v1), _VersionStrToSemanticVersion(v2)
  return _CompareVersions(v1, v2)


def IsVersionComposer3Compatible(image_version):
  """Checks if given `image_version` is compatible with Composer 3.

  Args:
    image_version: image version str that includes Composer version.

  Returns:
    True if Composer version is greater than or equal to 3.0.0 or its prerelease
    variant, otherwise False.
  """

  if image_version:
    version_item = _ImageVersionItem(image_version)
    if version_item and version_item.composer_ver:
      composer_version = version_item.composer_ver
      if composer_version == '3':
        return True
      if composer_version == 'latest':
        composer_version = COMPOSER_LATEST_VERSION_PLACEHOLDER
      return IsVersionInRange(
          composer_version, flags.MIN_COMPOSER3_VERSION, None,
          True)
  return False


def IsVersionAirflowCommandsApiCompatible(image_version):
  """Checks if given `version` is compatible with Composer Airflow Commands API.

  Args:
    image_version: image version str that includes Composer version.

  Returns:
    True if Composer version is compatible with Aiflow Commands API,
    otherwise False.
  """

  if image_version:
    version_item = _ImageVersionItem(image_version)
    if version_item and version_item.composer_ver:
      composer_version = version_item.composer_ver
      return IsVersionInRange(
          composer_version,
          flags.MIN_COMPOSER_RUN_AIRFLOW_CLI_VERSION,
          None,
          True,
      )
  return False


def IsVersionTriggererCompatible(image_version):
  """Checks if given `version` is compatible with triggerer .

  Args:
    image_version: image version str that includes airflow version.

  Returns:
    True if given airflow version is compatible with Triggerer(>=2.2.x)
    and Composer version is >=2.0.31 otherwise False
  """

  if image_version:
    version_item = _ImageVersionItem(image_version)
    # Triggerer is supported in Composer 3.
    if IsVersionComposer3Compatible(image_version):
      return True
    if version_item and version_item.airflow_ver and version_item.composer_ver:
      airflow_version = version_item.airflow_ver
      composer_version = version_item.composer_ver
      if composer_version == 'latest':
        composer_version = COMPOSER_LATEST_VERSION_PLACEHOLDER
      return IsVersionInRange(
          composer_version, flags.MIN_TRIGGERER_COMPOSER_VERSION, None, True
      ) and IsVersionInRange(
          airflow_version, flags.MIN_TRIGGERER_AIRFLOW_VERSION, None, True
      )
  return False


def IsVersionInRange(version, range_from, range_to, loose=False):
  """Checks if given `version` is in range of (`range_from`, `range_to`).

  Args:
    version: version to check
    range_from: left boundary of range (inclusive), if None - no boundary
    range_to: right boundary of range (exclusive), if None - no boundary
    loose: if true use LooseVersion to compare, use SemVer otherwise

  Returns:
    True if given version is in range, otherwise False.
  """
  compare_fn = CompareLooseVersions if loose else CompareVersions
  return ((range_from is None or compare_fn(range_from, version) <= 0) and
          (range_to is None or compare_fn(version, range_to) < 0))


def _BuildUpgradeCandidateList(location_ref,
                               image_version_id,
                               python_version,
                               release_track=base.ReleaseTrack.GA):
  """Builds a list of eligible image version upgrades."""
  image_version_service = image_version_api_util.ImageVersionService(
      release_track)
  image_version_item = _ImageVersionItem(image_version_id)

  available_upgrades = []
  # Checks if current composer version meets minimum threshold.
  if (
      IsVersionComposer3Compatible(image_version_id)
      or CompareVersions(
          MIN_UPGRADEABLE_COMPOSER_VER, image_version_item.composer_ver
      )
      <= 0
  ):
    # If so, builds list of eligible upgrades.
    for version in image_version_service.List(location_ref):
      if _ValidateCandidateImageVersionId(
          image_version_id, version.imageVersionId
      ).upgrade_valid and (
          not python_version
          or python_version in version.supportedPythonVersions
      ):
        available_upgrades.append(version)
  else:
    raise InvalidImageVersionError(
        'This environment does not support upgrades.')

  return available_upgrades


def _GetComposerMajorVersion(composer_ver_alias):
  if composer_ver_alias == 'latest':
    return COMPOSER_LATEST_MAJOR_VERSION
  return int(composer_ver_alias)


def _IsComposerMajorOnlyVersionUpgradeCompatible(parsed_curr, parsed_cand):
  """Validates whether Composer major only version upgrade is compatible."""

  if parsed_curr.composer_contains_alias:
    major_version_curr = _GetComposerMajorVersion(
        parsed_curr.composer_contains_alias[0]
    )
  else:
    major_version_curr = semver.SemVer(parsed_curr.composer_ver).major
  if parsed_cand.composer_contains_alias:
    major_version_cand = _GetComposerMajorVersion(
        parsed_cand.composer_contains_alias[0]
    )
  else:
    major_version_cand = semver.SemVer(parsed_cand.composer_ver).major

  # Allow major version upgrades only between Composer 2 and Composer 3.
  return UpgradeValidator(
      major_version_curr == major_version_cand
      or (major_version_curr == 2 and major_version_cand == 3),
      None,
  )


def _ValidateCandidateImageVersionId(
    current_image_version_id, candidate_image_version_id
):
  """Determines if candidate version is a valid upgrade from current version.

  Args:
    current_image_version_id: current image version
    candidate_image_version_id: image version requested for upgrade

  Returns:
    UpgradeValidator namedtuple containing True and None error message if
    given version upgrade between given versions is valid, otherwise False and
    error message with problems description.
  """
  upgrade_validator = UpgradeValidator(True, None)
  if current_image_version_id == candidate_image_version_id:
    error_message = ('Existing and requested image versions are equal ({}). '
                     'Select image version newer than current to perform '
                     'upgrade.').format(current_image_version_id)
    upgrade_validator = UpgradeValidator(False, error_message)

  parsed_curr = _ImageVersionItem(image_ver=current_image_version_id)
  parsed_cand = _ImageVersionItem(image_ver=candidate_image_version_id)

  has_alias_or_major_only_composer_ver = (
      parsed_cand.composer_contains_alias
      or parsed_curr.composer_contains_alias
  )

  # Checks Composer versions.
  if has_alias_or_major_only_composer_ver:
    upgrade_validator = _IsComposerMajorOnlyVersionUpgradeCompatible(
        parsed_curr, parsed_cand
    )
  elif (
      upgrade_validator.upgrade_valid
  ):
    upgrade_validator = _IsVersionUpgradeCompatible(
        parsed_curr.composer_ver, parsed_cand.composer_ver, 'Composer'
    )

  # Checks Airflow versions.
  if upgrade_validator.upgrade_valid and not parsed_cand.airflow_contains_alias:
    upgrade_validator = _IsVersionUpgradeCompatible(parsed_curr.airflow_ver,
                                                    parsed_cand.airflow_ver,
                                                    'Airflow')

  # Leaves the validity check to the Composer backend request validation.
  return upgrade_validator


def _VersionStrToSemanticVersion(version_str):
  """Parses version_str into semantic version."""
  return semver.SemVer(version_str)


def _VersionStrToLooseVersion(version_str):
  """Parses version_str into loose version."""
  return semver.LooseVersion(version_str)


def _IsVersionUpgradeCompatible(cur_version, candidate_version,
                                image_version_part):
  """Validates whether version candidate is greater than or equal to current.

  Applicable both for Airflow and Composer version upgrades. Composer supports
  both Airflow and self MINOR and PATCH-level upgrades.

  Args:
    cur_version: current 'a.b.c' version
    candidate_version: candidate 'x.y.z' version
    image_version_part: part of image to be validated. Must be either 'Airflow'
      or 'Composer'

  Returns:
    UpgradeValidator namedtuple containing boolean value whether selected image
    version component is valid for upgrade and eventual error message if it is
    not.
  """
  assert image_version_part in ['Airflow', 'Composer']
  curr_semantic_version = _VersionStrToSemanticVersion(cur_version)
  cand_semantic_version = _VersionStrToSemanticVersion(candidate_version)

  if curr_semantic_version > cand_semantic_version:
    error_message = ('Upgrade cannot decrease {composer_or_airflow1}\'s '
                     'version. Current {composer_or_airflow2} version: '
                     '{cur_version}, requested {composer_or_airflow3} version: '
                     '{req_version}.').format(
                         composer_or_airflow1=image_version_part,
                         composer_or_airflow2=image_version_part,
                         cur_version=cur_version,
                         composer_or_airflow3=image_version_part,
                         req_version=candidate_version)
    return UpgradeValidator(False, error_message)

  # Allow only version upgrades only between Composer 2 and Composer 3.
  if (
      (curr_semantic_version.major != 2 or cand_semantic_version.major != 3)
      and curr_semantic_version.major != cand_semantic_version.major
  ):
    error_message = (
        "Upgrades between different {}'s major versions are not"
        ' supported. Current major version {}, requested major '
        'version {}.'
    ).format(
        image_version_part,
        curr_semantic_version.major,
        cand_semantic_version.major,
    )
    return UpgradeValidator(False, error_message)

  return UpgradeValidator(True, None)