HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/api_lib/compute/csek_utils.py
# -*- coding: utf-8 -*- #
# Copyright 2014 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility functions for managing customer supplied encryption keys."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import abc
import base64
import json
import re

from googlecloudsdk.api_lib.compute import exceptions
from googlecloudsdk.core import resources
from googlecloudsdk.core.console import console_io
import six

CSEK_HELP_URL = ('https://cloud.google.com/compute/docs/disks/'
                 'customer-supplied-encryption')
EXPECTED_RECORD_KEY_KEYS = {'uri', 'key', 'key-type'}
BASE64_RAW_KEY_LENGTH_IN_CHARS = 44
BASE64_RSA_ENCRYPTED_KEY_LENGTH_IN_CHARS = 344


class Error(exceptions.Error):
  """Base exception for Csek(customer supplied encryption keys) exceptions."""


class InvalidKeyFileException(Error):
  """There's a problem in a CSEK file."""

  def __init__(self, base_message):
    super(InvalidKeyFileException, self).__init__(
        '{0}\nFor information on proper key file format see: '
        'https://cloud.google.com/compute/docs/disks/'
        'customer-supplied-encryption#key_file'.format(base_message))


class BadPatternException(InvalidKeyFileException):
  """A (e.g.) url pattern is bad and why."""

  def __init__(self, pattern_type, pattern):
    self.pattern_type = pattern_type
    self.pattern = pattern
    super(BadPatternException, self).__init__(
        'Invalid value for [{0}] pattern: [{1}]'.format(
            self.pattern_type,
            self.pattern))


class InvalidKeyExceptionNoContext(InvalidKeyFileException):
  """Indicate that a particular key is bad and why."""

  def __init__(self, key, issue):
    self.key = key
    self.issue = issue
    super(InvalidKeyExceptionNoContext, self).__init__(
        'Invalid key, [{0}] : {1}'.format(
            self.key,
            self.issue))


class InvalidKeyException(InvalidKeyFileException):
  """Indicate that a particular key is bad, why, and where."""

  def __init__(self, key, key_id, issue):
    self.key = key
    self.key_id = key_id
    self.issue = issue
    super(InvalidKeyException, self).__init__(
        'Invalid key, [{0}], for [{1}]: {2}'.format(
            self.key,
            self.key_id,
            self.issue))


def ValidateKey(base64_encoded_string, expected_key_length):
  """ValidateKey(s, k) returns None or raises InvalidKeyExceptionNoContext."""

  if expected_key_length < 1:
    raise ValueError('ValidateKey requires expected_key_length > 1.  Got {0}'
                     .format(expected_key_length))

  if len(base64_encoded_string) != expected_key_length:
    raise InvalidKeyExceptionNoContext(
        base64_encoded_string,
        'Key should contain {0} characters (including padding), '
        'but is [{1}] characters long.'.format(
            expected_key_length,
            len(base64_encoded_string)))

  if base64_encoded_string[-1] != '=':
    raise InvalidKeyExceptionNoContext(
        base64_encoded_string,
        'Bad padding.  Keys should end with an \'=\' character.')

  try:
    base64_encoded_string_as_str = base64_encoded_string.encode('ascii')
  except UnicodeDecodeError:
    raise InvalidKeyExceptionNoContext(
        base64_encoded_string,
        'Key contains non-ascii characters.')

  if not re.match(r'^[a-zA-Z0-9+/=]*$', base64_encoded_string):
    raise InvalidKeyExceptionNoContext(
        base64_encoded_string_as_str,
        'Key contains unexpected characters. Base64 encoded strings '
        'contain only letters (upper or lower case), numbers, '
        'plusses \'+\', slashes \'/\', or equality signs \'=\'.')

  try:
    base64.b64decode(base64_encoded_string_as_str)
  except TypeError as t:
    raise InvalidKeyExceptionNoContext(
        base64_encoded_string,
        'Key is not valid base64: [{0}].'.format(t.message))


class CsekKeyBase(six.with_metaclass(abc.ABCMeta, object)):
  """A class representing for CSEK keys."""

  def __init__(self, key_material):
    ValidateKey(key_material, expected_key_length=self.GetKeyLength())
    self._key_material = key_material

  @staticmethod
  def MakeKey(key_material, key_type, allow_rsa_encrypted=False):
    """Make a CSEK key.

    Args:
      key_material: str, the key material for this key
      key_type: str, the type of this key
      allow_rsa_encrypted: bool, whether the key is allowed to be RSA-wrapped

    Returns:
      CsekRawKey or CsekRsaEncryptedKey derived from the given key material and
      type.

    Raises:
      BadKeyTypeException: if the key is not a valid key type
    """

    if key_type == 'raw':
      return CsekRawKey(key_material)

    if key_type == 'rsa-encrypted':
      if allow_rsa_encrypted:
        return CsekRsaEncryptedKey(key_material)
      raise BadKeyTypeException(
          key_type,
          'this feature is only allowed in the alpha and beta versions of this '
          'command.')

    raise BadKeyTypeException(key_type)

  @abc.abstractmethod
  def GetKeyLength(self):
    raise NotImplementedError('GetKeyLength() must be overridden.')

  @abc.abstractmethod
  def ToMessage(self, compute_client):
    del compute_client
    raise NotImplementedError('ToMessage() must be overridden.')

  @property
  def key_material(self):
    return self._key_material


class CsekRawKey(CsekKeyBase):
  """Class representing raw CSEK keys."""

  def GetKeyLength(self):
    return BASE64_RAW_KEY_LENGTH_IN_CHARS

  def ToMessage(self, compute_client):
    return compute_client.MESSAGES_MODULE.CustomerEncryptionKey(
        rawKey=str(self.key_material))


class CsekRsaEncryptedKey(CsekKeyBase):
  """Class representing rsa encrypted CSEK keys."""

  def GetKeyLength(self):
    return BASE64_RSA_ENCRYPTED_KEY_LENGTH_IN_CHARS

  def ToMessage(self, compute_client):
    return compute_client.MESSAGES_MODULE.CustomerEncryptionKey(
        rsaEncryptedKey=str(self.key_material))


class BadKeyTypeException(InvalidKeyFileException):
  """A key type is bad and why."""

  def __init__(self, key_type, explanation=''):
    self.key_type = key_type
    msg = 'Invalid key type [{0}]'.format(self.key_type)
    if explanation:
      msg += ': ' + explanation
    msg += '.'
    super(BadKeyTypeException, self).__init__(msg)


class MissingCsekException(Error):

  def __init__(self, resource):
    super(MissingCsekException, self).__init__(
        'Key required for resource [{0}], but none found.'.format(resource))


def AddCsekKeyArgs(parser, flags_about_creation=True, resource_type='resource'):
  """Adds arguments related to csek keys."""
  parser.add_argument(
      '--csek-key-file',
      metavar='FILE',
      help="""\
      Path to a Customer-Supplied Encryption Key (CSEK) key file that maps
      Compute Engine {resource}s to user managed keys to be used when
      creating, mounting, or taking snapshots of disks.

      If you pass `-` as value of the flag, the CSEK is read from stdin.
      See {csek_help} for more details.
      """.format(resource=resource_type, csek_help=CSEK_HELP_URL))

  if flags_about_creation:
    parser.add_argument(
        '--require-csek-key-create',
        action='store_true',
        default=True,
        help="""\
        Refuse to create {resource}s not protected by a user managed key in
        the key file when --csek-key-file is given. This behavior is enabled
        by default to prevent incorrect gcloud invocations from accidentally
        creating {resource}s with no user managed key. Disabling the check
        allows creation of some {resource}s without a matching
        Customer-Supplied Encryption Key in the supplied --csek-key-file.
        See {csek_help} for more details.
        """.format(resource=resource_type, csek_help=CSEK_HELP_URL))


class UriPattern(object):
  """A uri-based pattern that maybe be matched against resource objects."""

  def __init__(self, path_as_string):
    if not path_as_string.startswith('http'):
      raise BadPatternException('uri', path_as_string)
    self._path_as_string = resources.REGISTRY.ParseURL(
        path_as_string).RelativeName()

  def Matches(self, resource):
    """Tests if its argument matches the pattern."""
    return self._path_as_string == resource.RelativeName()

  def __str__(self):
    return 'Uri Pattern: ' + self._path_as_string


class CsekKeyStore(object):
  """Represents a map from resource patterns to keys."""

  # Members
  # self._state: dictionary from UriPattern to an instance of (a subclass of)
  # CsekKeyBase

  @classmethod
  def FromFile(cls, fname, allow_rsa_encrypted):
    """FromFile loads a CsekKeyStore from a file.

    Args:
      fname: str, the name of a file intended to contain a well-formed key file
      allow_rsa_encrypted: bool, whether to allow keys of type 'rsa-encrypted'

    Returns:
      A CsekKeyStore, if found

    Raises:
      googlecloudsdk.core.util.files.Error: If the file cannot be read or is
                                            larger than max_bytes.
    """

    content = console_io.ReadFromFileOrStdin(fname, binary=False)
    return cls(content, allow_rsa_encrypted)

  @staticmethod
  def FromArgs(args, allow_rsa_encrypted=False):
    """FromFile attempts to load a CsekKeyStore from a command's args.

    Args:
      args: CLI args with a csek_key_file field set
      allow_rsa_encrypted: bool, whether to allow keys of type 'rsa-encrypted'

    Returns:
      A CsekKeyStore, if a valid key file name is provided as csek_key_file
      None, if args.csek_key_file is None

    Raises:
      exceptions.BadFileException: there's a problem reading fname
      exceptions.InvalidKeyFileException: the key file failed to parse
        or was otherwise invalid
    """
    if args.csek_key_file is None:
      return None

    return CsekKeyStore.FromFile(args.csek_key_file, allow_rsa_encrypted)

  @staticmethod
  def _ParseAndValidate(s, allow_rsa_encrypted=False):
    """_ParseAndValidate(s) inteprets s as a csek key file.

    Args:
      s: str, an input to parse
      allow_rsa_encrypted: bool, whether to allow RSA-wrapped keys

    Returns:
      a valid state object

    Raises:
      InvalidKeyFileException: if the input doesn't parse or is not well-formed.
    """

    assert isinstance(s, six.string_types)
    state = {}

    try:
      records = json.loads(s)

      if not isinstance(records, list):
        raise InvalidKeyFileException(
            'Key file\'s top-level element must be a JSON list.')

      for key_record in records:
        if not isinstance(key_record, dict):
          raise InvalidKeyFileException(
              'Key file records must be JSON objects, but [{0}] found.'.format(
                  json.dumps(key_record)))

        if set(key_record.keys()) != EXPECTED_RECORD_KEY_KEYS:
          raise InvalidKeyFileException(
              'Record [{0}] has incorrect json keys; [{1}] expected'.format(
                  json.dumps(key_record),
                  ','.join(EXPECTED_RECORD_KEY_KEYS)))

        pattern = UriPattern(key_record['uri'])

        try:
          state[pattern] = CsekKeyBase.MakeKey(
              key_material=key_record['key'], key_type=key_record['key-type'],
              allow_rsa_encrypted=allow_rsa_encrypted)
        except InvalidKeyExceptionNoContext as e:
          raise InvalidKeyException(key=e.key, key_id=pattern, issue=e.issue)

    except ValueError as e:
      raise InvalidKeyFileException(*e.args)

    assert isinstance(state, dict)
    return state

  def __len__(self):
    return len(self.state)

  def LookupKey(self, resource, raise_if_missing=False):
    """Search for the unique key corresponding to a given resource.

    Args:
      resource: the resource to find a key for.
      raise_if_missing: bool, raise an exception if the resource is not found.

    Returns: CsekKeyBase, corresponding to the resource, or None if not found
      and not raise_if_missing.

    Raises:
      InvalidKeyFileException: if there are two records matching the resource.
      MissingCsekException: if raise_if_missing and no key is found
        for the provided resource.
    """

    assert isinstance(self.state, dict)
    search_state = (None, None)

    for pat, key in six.iteritems(self.state):
      if pat.Matches(resource):
        if search_state[0]:
          raise InvalidKeyFileException(
              'Uri patterns [{0}] and [{1}] both match '
              'resource [{2}].  Bailing out.'.format(
                  search_state[0], pat, str(resource)))

        search_state = (pat, key)

    if raise_if_missing and (search_state[1] is None):
      raise MissingCsekException(resource)

    return search_state[1]

  def __init__(self, json_string, allow_rsa_encrypted=False):
    self.state = CsekKeyStore._ParseAndValidate(json_string,
                                                allow_rsa_encrypted)


# Functions below make it easy for clients to operate on values that possibly
# either CsekKeyStores or None or else CsekKeyBases or None.  Fellow functional
# programming geeks: basically we're faking the Maybe monad.
def MaybeToMessage(csek_key_or_none, compute):
  return csek_key_or_none.ToMessage(compute) if csek_key_or_none else None


def MaybeLookupKey(csek_keys_or_none, resource):
  if csek_keys_or_none and resource:
    return csek_keys_or_none.LookupKey(resource)

  return None


def MaybeLookupKeyMessage(csek_keys_or_none, resource, compute_client):
  maybe_key = MaybeLookupKey(csek_keys_or_none, resource)
  return MaybeToMessage(maybe_key, compute_client)


def MaybeLookupKeys(csek_keys_or_none, resource_collection):
  return [MaybeLookupKey(csek_keys_or_none, r) for r in resource_collection]


def MaybeLookupKeyMessages(
    csek_keys_or_none, resource_collection, compute_client):
  return [MaybeToMessage(k, compute_client) for k in
          MaybeLookupKeys(csek_keys_or_none, resource_collection)]


def MaybeLookupKeysByUri(csek_keys_or_none, parser, uris):
  return MaybeLookupKeys(
      csek_keys_or_none,
      [(parser.Parse(u) if u else None) for u in uris])


def MaybeLookupKeyMessagesByUri(csek_keys_or_none, parser,
                                uris, compute_client):
  return [MaybeToMessage(k, compute_client) for k in
          MaybeLookupKeysByUri(csek_keys_or_none, parser, uris)]