HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/core/docker/docker.py
# -*- coding: utf-8 -*- #
# Copyright 2014 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility library for configuring access to the Google Container Registry.

Sets docker up to authenticate with the Google Container Registry using the
active gcloud credential.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import base64
import json
import os
import subprocess
import sys

from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core.credentials import store
from googlecloudsdk.core.docker import client_lib
from googlecloudsdk.core.docker import constants
from googlecloudsdk.core.util import files
import six


_USERNAME = 'gclouddockertoken'
_EMAIL = 'not@val.id'
_CREDENTIAL_STORE_KEY = 'credsStore'


class UnsupportedRegistryError(client_lib.DockerError):
  """Indicates an attempt to use an unsupported registry."""

  def __init__(self, image_url):
    self.image_url = image_url

  def __str__(self):
    return ('{0} is not in a supported registry.  Supported registries are '
            '{1}'.format(self.image_url, constants.ALL_SUPPORTED_REGISTRIES))


def DockerLogin(server, username, access_token):
  """Register the username / token for the given server on Docker's keyring."""

  # Sanitize and normalize the server input.
  parsed_url = client_lib.GetNormalizedURL(server)

  server = parsed_url.geturl()

  # 'docker login' must be used due to the change introduced in
  # https://github.com/docker/docker/pull/20107 .
  docker_args = ['login']
  docker_args.append('--username=' + username)
  docker_args.append('--password=' + access_token)
  docker_args.append(server)  # The auth endpoint must be the last argument.

  docker_p = client_lib.GetDockerProcess(
      docker_args,
      stdin_file=sys.stdin,
      stdout_file=subprocess.PIPE,
      stderr_file=subprocess.PIPE)

  # Wait for docker to finished executing and retrieve its stdout/stderr.
  stdoutdata, stderrdata = docker_p.communicate()

  if docker_p.returncode == 0:
    # If the login was successful, print only unexpected info.
    _SurfaceUnexpectedInfo(stdoutdata, stderrdata)
  else:
    # If the login failed, print everything.
    log.error('Docker CLI operation failed:')
    log.out.Print(stdoutdata)
    log.status.Print(stderrdata)
    raise client_lib.DockerError('Docker login failed.')


def _SurfaceUnexpectedInfo(stdoutdata, stderrdata):
  """Reads docker's output and surfaces unexpected lines.

  Docker's CLI has a certain amount of chattiness, even on successes.

  Args:
    stdoutdata: The raw data output from the pipe given to Popen as stdout.
    stderrdata: The raw data output from the pipe given to Popen as stderr.
  """

  # Split the outputs by lines.
  stdout = [s.strip() for s in stdoutdata.splitlines()]
  stderr = [s.strip() for s in stderrdata.splitlines()]

  for line in stdout:
    # Swallow 'Login Succeeded' and 'saved in,' surface any other std output.
    if (line != 'Login Succeeded') and (
        'login credentials saved in' not in line):
      line = '%s%s' % (line, os.linesep)
      log.out.Print(line)  # log.out => stdout

  for line in stderr:
    if not _IsExpectedErrorLine(line):
      line = '%s%s' % (line, os.linesep)
      log.status.Print(line)  # log.status => stderr


def _CredentialStoreConfigured():
  """Returns True if a credential store is specified in the docker config.

  Returns:
    True if a credential store is specified in the docker config.
    False if the config file does not exist or does not contain a
    'credsStore' key.
  """
  try:
    # Not Using DockerConfigInfo here to be backward compatiable with
    # UpdateDockerCredentials which should still work if Docker is not installed
    path, is_new_format = client_lib.GetDockerConfigPath()
    contents = client_lib.ReadConfigurationFile(path)
    if is_new_format:
      return _CREDENTIAL_STORE_KEY in contents
    else:
      # The old format is for Docker <1.7.0.
      # Older Docker clients (<1.11.0) don't support credential helpers.
      return False
  except IOError:
    # Config file doesn't exist.
    return False


def ReadDockerAuthConfig():
  """Retrieve the contents of the Docker authorization entry.

  NOTE: This is public only to facilitate testing.

  Returns:
    The map of authorizations used by docker.
  """
  # Not using DockerConfigInfo here to be backward compatible with
  # UpdateDockerCredentials which should still work if Docker is not installed
  path, new_format = client_lib.GetDockerConfigPath()
  structure = client_lib.ReadConfigurationFile(path)
  if new_format:
    return structure['auths'] if 'auths' in structure else {}
  else:
    return structure


def WriteDockerAuthConfig(structure):
  """Write out a complete set of Docker authorization entries.

  This is public only to facilitate testing.

  Args:
    structure: The dict of authorization mappings to write to the
               Docker configuration file.
  """
  # Not using DockerConfigInfo here to be backward compatible with
  # UpdateDockerCredentials which should still work if Docker is not installed
  path, is_new_format = client_lib.GetDockerConfigPath()
  contents = client_lib.ReadConfigurationFile(path)
  if is_new_format:
    full_cfg = contents
    full_cfg['auths'] = structure
    file_contents = json.dumps(full_cfg, indent=2)
  else:
    file_contents = json.dumps(structure, indent=2)
  files.WriteFileAtomically(path, file_contents)


def UpdateDockerCredentials(server, refresh=True):
  """Updates the docker config to have fresh credentials.

  This reads the current contents of Docker's keyring, and extends it with
  a fresh entry for the provided 'server', based on the active gcloud
  credential.  If a credential exists for 'server' this replaces it.

  Args:
    server: The hostname of the registry for which we're freshening
       the credential.
    refresh: Whether to force a token refresh on the active credential.

  Raises:
    core.credentials.exceptions.Error: There was an error loading the
      credentials.
  """

  if refresh:
    access_token = store.GetFreshAccessToken()
  else:
    access_token = store.GetAccessToken()

  if not access_token:
    raise exceptions.Error(
        'No access token could be obtained from the current credentials.')

  if _CredentialStoreConfigured():
    try:
      # Update the credentials stored by docker, passing the sentinel username
      # and access token.
      DockerLogin(server, _USERNAME, access_token)
    except client_lib.DockerError as e:
      # Only catch docker-not-found error
      if six.text_type(e) != client_lib.DOCKER_NOT_FOUND_ERROR:
        raise

      # Fall back to the previous manual .dockercfg manipulation
      # in order to support gcloud app's docker-binaryless use case.
      _UpdateDockerConfig(server, _USERNAME, access_token)
      log.warning(
          "'docker' was not discovered on the path. Credentials have been "
          'stored, but are not guaranteed to work with the Docker client '
          ' if an external credential store is configured.')
  else:
    _UpdateDockerConfig(server, _USERNAME, access_token)


def _UpdateDockerConfig(server, username, access_token):
  """Register the username / token for the given server on Docker's keyring."""

  # NOTE: using "docker login" doesn't work as they're quite strict on what
  # is allowed in username/password.
  try:
    dockercfg_contents = ReadDockerAuthConfig()
  except (IOError, client_lib.InvalidDockerConfigError):
    # If the file doesn't exist, start with an empty map.
    dockercfg_contents = {}

  # Add the entry for our server.
  auth = username + ':' + access_token
  auth = base64.b64encode(auth.encode('ascii')).decode('ascii')

  # Sanitize and normalize the server input.
  parsed_url = client_lib.GetNormalizedURL(server)

  server = parsed_url.geturl()
  server_unqualified = parsed_url.hostname

  # Clear out any unqualified stale entry for this server
  if server_unqualified in dockercfg_contents:
    del dockercfg_contents[server_unqualified]

  dockercfg_contents[server] = {'auth': auth, 'email': _EMAIL}

  WriteDockerAuthConfig(dockercfg_contents)


def _IsExpectedErrorLine(line):
  """Returns whether or not the given line was expected from the Docker client.

  Args:
    line: The line received in stderr from Docker
  Returns:
    True if the line was expected, False otherwise.
  """
  expected_line_substrs = [
      # --email is deprecated
      '--email',
      # login success
      'login credentials saved in',
      # Use stdin for passwords
      'WARNING! Using --password via the CLI is insecure. Use --password-stdin.'
  ]
  for expected_line_substr in expected_line_substrs:
    if expected_line_substr in line:
      return True
  return False