HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/compute/sign_url_utils.py
# -*- coding: utf-8 -*- #
# Copyright 2017 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for generating Cloud CDN Signed URLs."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import base64
import hashlib
import hmac
import time

from googlecloudsdk.core import exceptions as core_exceptions
from googlecloudsdk.core import requests
from googlecloudsdk.core.util import encoding
import six.moves.urllib.parse

_URL_SCHEME_MUST_BE_HTTP_HTTPS_MESSAGE = (
    'The URL scheme must be either HTTP or HTTPS.')
_URL_MUST_NOT_HAVE_PARAM_MESSAGE = (
    'The URL must not have a \'{}\' query parameter')
_URL_MUST_NOT_HAVE_FRAGMENT_MESSAGE = ('The URL must not have a fragment.')

_DISALLOWED_QUERY_PARAMETERS = ('Expires', 'KeyName', 'Signature')


class InvalidCdnSignedUrlError(core_exceptions.Error):
  """Invalid URL error."""


def _GetSignature(key, url):
  """Gets the base64url encoded HMAC-SHA1 signature of the specified URL.

  Args:
    key: The key value to use for signing.
    url: The url to use for signing.

  Returns:
    The signature of the specified URL calculated using HMAC-SHA1 signature
    digest and encoding the result using base64url.
  """
  signature = hmac.new(key, url, hashlib.sha1).digest()
  return base64.urlsafe_b64encode(signature)


def _SecondsFromNowToUnixTimestamp(seconds_from_now):
  """Converts the number of seconds from now into a unix timestamp."""
  return int(time.time() + seconds_from_now)


def SignUrl(url, key_name, encoded_key_value, validity_seconds):
  """Gets the Signed URL string for the specified URL and configuration.

  Args:
    url: The URL to sign.
    key_name: Signed URL key name to use for the 'KeyName=' query parameter.
    encoded_key_value: The base64url encoded key value to use for signing.
    validity_seconds: The number of seconds for which this signed URL will
        be valid, starting when this function is called.

  Returns:
    Returns the Signed URL appended with the query parameters based on the
    specified configuration.

  Raises:
    InvalidCdnSignedUrlError: if the URL is invalid and/or failed to parse the
        URL.
  """
  stripped_url = url.strip()

  parsed_url = six.moves.urllib.parse.urlsplit(stripped_url)
  if parsed_url.scheme != 'https' and parsed_url.scheme != 'http':
    raise InvalidCdnSignedUrlError(_URL_SCHEME_MUST_BE_HTTP_HTTPS_MESSAGE)

  if parsed_url.fragment:
    raise InvalidCdnSignedUrlError(_URL_MUST_NOT_HAVE_FRAGMENT_MESSAGE)

  query_params = six.moves.urllib.parse.parse_qs(
      parsed_url.query, keep_blank_values=True)

  for param in _DISALLOWED_QUERY_PARAMETERS:
    if param in query_params:
      raise InvalidCdnSignedUrlError(
          _URL_MUST_NOT_HAVE_PARAM_MESSAGE.format(param))

  # Append the query parameters to the URL and calculate the signature.
  # Do not use the urllib.urlencode()/urlparse.urlunsplit() as the conversion
  # might not yield the exact URL again (it will still be equivalent) and
  # we do not want to modify the user's URL.
  url_to_sign = '{url}{separator}Expires={expires}&KeyName={keyName}'.format(
      url=stripped_url,
      separator='&' if query_params else '?',
      expires=_SecondsFromNowToUnixTimestamp(validity_seconds),
      keyName=key_name)

  # Append the signature as another query parameter.
  signature = _GetSignature(
      base64.urlsafe_b64decode(encoded_key_value), url_to_sign.encode('utf-8'))
  return '{url}&Signature={signature}'.format(
      url=url_to_sign, signature=encoding.Decode(signature))


def ValidateSignedUrl(signed_url):
  """Validates the Signed URL by returning the response code for HEAD request.

  Args:
    signed_url: The Signed URL which should be validated.

  Returns:
    Returns the response code for the HEAD request to the specified Signed
        URL.
  """
  http_client = requests.GetSession()
  http_response = http_client.request('HEAD', signed_url)
  return http_response.status_code