HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/third_party/containerregistry/client/docker_creds_.py
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This package exposes credentials for talking to a Docker registry."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import abc
import base64
import errno
import io
import json
import logging
import os
import subprocess

from containerregistry.client import docker_name
import httplib2
from oauth2client import client as oauth2client

import six


class Provider(six.with_metaclass(abc.ABCMeta, object)):
  """Interface for providing User Credentials for use with a Docker Registry."""

  # pytype: disable=bad-return-type
  @abc.abstractmethod
  def Get(self):
    """Produces a value suitable for use in the Authorization header."""
  # pytype: enable=bad-return-type


class Anonymous(Provider):
  """Implementation for anonymous access."""

  def Get(self):
    """Implement anonymous authentication."""
    return ''


class SchemeProvider(Provider):
  """Implementation for providing a challenge response credential."""

  def __init__(self, scheme):
    self._scheme = scheme

  # pytype: disable=bad-return-type
  @property
  @abc.abstractmethod
  def suffix(self):
    """Returns the authentication payload to follow the auth scheme."""
  # pytype: enable=bad-return-type

  def Get(self):
    """Gets the credential in a form suitable for an Authorization header."""
    return u'%s %s' % (self._scheme, self.suffix)


class Basic(SchemeProvider):
  """Implementation for providing a username/password-based creds."""

  def __init__(self, username, password):
    super(Basic, self).__init__('Basic')
    self._username = username
    self._password = password

  @property
  def username(self):
    return self._username

  @property
  def password(self):
    return self._password

  @property
  def suffix(self):
    u = self.username.encode('utf8')
    p = self.password.encode('utf8')
    return base64.b64encode(u + b':' + p).decode('utf8')


_USERNAME = '_token'


class OAuth2(Basic):
  """Base class for turning OAuth2Credentials into suitable GCR credentials."""

  def __init__(self, creds,
               transport):
    """Constructor.

    Args:
      creds: the credentials from which to retrieve access tokens.
      transport: the http transport to use for token exchanges.
    """
    super(OAuth2, self).__init__(_USERNAME, 'does not matter')
    self._creds = creds
    self._transport = transport

  @property
  def password(self):
    # WORKAROUND...
    # The python oauth2client library only loads the credential from an
    # on-disk cache the first time 'refresh()' is called, and doesn't
    # actually 'Force a refresh of access_token' as advertised.
    # This call will load the credential, and the call below will refresh
    # it as needed.  If the credential is unexpired, the call below will
    # simply return a cache of this refresh.
    unused_at = self._creds.get_access_token(http=self._transport)

    # Most useful API ever:
    # https://www.googleapis.com/oauth2/v1/tokeninfo?access_token={at}
    return self._creds.get_access_token(http=self._transport).access_token


_MAGIC_NOT_FOUND_MESSAGE = 'credentials not found in native keychain'


class Helper(Basic):
  """This provider wraps a particularly named credential helper."""

  def __init__(self, name, registry):
    """Constructor.

    Args:
      name: the name of the helper, as it appears in the Docker config.
      registry: the registry for which we're invoking the helper.
    """
    super(Helper, self).__init__('does not matter', 'does not matter')
    self._name = name
    self._registry = registry.registry

  def Get(self):
    # Invokes:
    #   echo -n {self._registry} | docker-credential-{self._name} get
    # The resulting JSON blob will have 'Username' and 'Secret' fields.

    bin_name = 'docker-credential-{name}'.format(name=self._name)
    logging.info('Invoking %r to obtain Docker credentials.', bin_name)
    try:
      p = subprocess.Popen(
          [bin_name, 'get'],
          stdout=subprocess.PIPE,
          stdin=subprocess.PIPE,
          stderr=subprocess.STDOUT)
    except OSError as e:
      if e.errno == errno.ENOENT:
        raise Exception('executable not found: ' + bin_name)
      raise

    # Some keychains expect a scheme:
    # https://github.com/bazelbuild/rules_docker/issues/111
    stdout = p.communicate(
        input=('https://' + self._registry).encode('utf-8'))[0]
    if stdout.strip() == _MAGIC_NOT_FOUND_MESSAGE:
      # Use empty auth when no auth is found.
      logging.info('Credentials not found, falling back to anonymous auth.')
      return Anonymous().Get()

    if p.returncode != 0:
      raise Exception('Error fetching credential for %s, exit status: %d\n%s' %
                      (self._name, p.returncode, stdout))

    blob = json.loads(stdout.decode('utf-8'))
    logging.info('Successfully obtained Docker credentials.')
    return Basic(blob['Username'], blob['Secret']).Get()


class Keychain(six.with_metaclass(abc.ABCMeta, object)):
  """Interface for resolving an image reference to a credential."""

  # pytype: disable=bad-return-type
  @abc.abstractmethod
  def Resolve(self, name):
    """Resolves the appropriate credential for the given registry.

    Args:
      name: the registry for which we need a credential.

    Returns:
      a Provider suitable for use with registry operations.
    """
  # pytype: enable=bad-return-type


_FORMATS = [
    # Allow naked domains
    '%s',
    # Allow scheme-prefixed.
    'https://%s',
    'http://%s',
    # Allow scheme-prefixes with version in url path.
    'https://%s/v1/',
    'http://%s/v1/',
    'https://%s/v2/',
    'http://%s/v2/',
]


def _GetUserHomeDir():
  if os.name == 'nt':
    # %HOME% has precedence over %USERPROFILE% for os.path.expanduser('~')
    # The Docker config resides under %USERPROFILE% on Windows
    return os.path.expandvars('%USERPROFILE%')
  else:
    return os.path.expanduser('~')


def _GetConfigDirectory():
  # Return the value of $DOCKER_CONFIG, if it exists, otherwise ~/.docker
  # see https://github.com/docker/docker/blob/master/cliconfig/config.go
  if os.environ.get('DOCKER_CONFIG') is not None:
    return os.environ.get('DOCKER_CONFIG')
  else:
    return os.path.join(_GetUserHomeDir(), '.docker')


class _DefaultKeychain(Keychain):
  """This implements the default docker credential resolution."""

  def __init__(self):
    # Store a custom directory to get the Docker configuration JSON from
    self._config_dir = None
    # Name of the docker configuration JSON file to look for in the
    # configuration directory
    self._config_file = 'config.json'

  def setCustomConfigDir(self, config_dir):
    # Override the configuration directory where the docker configuration
    # JSON is searched for
    if not os.path.isdir(config_dir):
      raise Exception('Attempting to override docker configuration directory'
                      ' to invalid directory: {}'.format(config_dir))
    self._config_dir = config_dir

  def Resolve(self, name):
    # TODO(user): Consider supporting .dockercfg, which was used prior
    # to Docker 1.7 and consisted of just the contents of 'auths' below.
    logging.info('Loading Docker credentials for repository %r', str(name))
    config_file = None
    if self._config_dir is not None:
      config_file = os.path.join(self._config_dir, self._config_file)
    else:
      config_file = os.path.join(_GetConfigDirectory(), self._config_file)
    try:
      with io.open(config_file, u'r', encoding='utf8') as reader:
        cfg = json.loads(reader.read())
    except IOError:
      # If the file doesn't exist, fallback on anonymous auth.
      return Anonymous()

    # Per-registry credential helpers take precedence.
    cred_store = cfg.get('credHelpers', {})
    for form in _FORMATS:
      if form % name.registry in cred_store:
        return Helper(cred_store[form % name.registry], name)

    # A global credential helper is next in precedence.
    if 'credsStore' in cfg:
      return Helper(cfg['credsStore'], name)

    # Lastly, the 'auths' section directly contains basic auth entries.
    auths = cfg.get('auths', {})
    for form in _FORMATS:
      if form % name.registry in auths:
        entry = auths[form % name.registry]
        if 'auth' in entry:
          decoded = base64.b64decode(entry['auth']).decode('utf8')
          username, password = decoded.split(':', 1)
          return Basic(username, password)
        elif 'username' in entry and 'password' in entry:
          return Basic(entry['username'], entry['password'])
        else:
          # TODO(user): Support identitytoken
          # TODO(user): Support registrytoken
          raise Exception(
              'Unsupported entry in "auth" section of Docker config: ' +
              json.dumps(entry))

    return Anonymous()


# pylint: disable=invalid-name
DefaultKeychain = _DefaultKeychain()