HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/core/credentials/flow.py
# -*- coding: utf-8 -*- #
# Copyright 2013 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Run a web flow for oauth2."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import abc
import contextlib
import os
import select
import socket
import sys
import webbrowser
import wsgiref

from google_auth_oauthlib import flow as google_auth_flow
from googlecloudsdk.core import config
from googlecloudsdk.core import exceptions as c_exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import requests
from googlecloudsdk.core.console import console_attr
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.universe_descriptor import universe_descriptor
from googlecloudsdk.core.util import pkg_resources
from oauthlib.oauth2.rfc6749 import errors as rfc6749_errors
from requests import exceptions as requests_exceptions
import six
from six.moves import input  # pylint: disable=redefined-builtin
from six.moves.urllib import parse

_PORT_SEARCH_ERROR_MSG = (
    'Failed to start a local webserver listening on any port '
    'between {start_port} and {end_port}. Please check your '
    'firewall settings or locally running programs that may be '
    'blocking or using those ports.')

_PORT_SEARCH_START = 8085
_PORT_SEARCH_END = _PORT_SEARCH_START + 100


class Error(c_exceptions.Error):
  """Exceptions for the flow module."""


class AuthRequestRejectedError(Error):
  """Exception for when the authentication request was rejected."""


class AuthRequestFailedError(Error):
  """Exception for when the authentication request failed."""


class LocalServerCreationError(Error):
  """Exception for when a local server cannot be created."""


class LocalServerTimeoutError(Error):
  """Exception for when the local server timeout before receiving request."""


class WebBrowserInaccessible(Error):
  """Exception for when a web browser is required but not accessible."""


def RaiseProxyError(source_exc):
  six.raise_from(
      AuthRequestFailedError(
          'Could not reach the login server. A potential cause of this could be'
          ' because you are behind a proxy. Please set the environment'
          ' variables HTTPS_PROXY and HTTP_PROXY to the address of the proxy in'
          ' the format "protocol://address:port" (without quotes) and try'
          ' again.\nExample: HTTPS_PROXY=http://192.168.0.1:8080'
      ),
      source_exc,
  )


def PromptForAuthCode(message, authorize_url, client_config=None):
  ImportReadline(client_config)
  log.err.Print(message.format(url=authorize_url))
  return input(
      'Once finished, enter the verification code provided in your browser: '
  ).strip()


@contextlib.contextmanager
def HandleOauth2FlowErrors():
  """Context manager for handling errors in the OAuth 2.0 flow."""
  try:
    yield
  except requests_exceptions.ProxyError as e:
    RaiseProxyError(e)
  except (
      rfc6749_errors.AccessDeniedError,
      rfc6749_errors.InvalidGrantError,
  ) as e:
    six.raise_from(AuthRequestRejectedError(e), e)
  except rfc6749_errors.MissingTokenError:
    # The real error is swallowed by the requests-oauthlib library. The
    # exception we catch here just says "Missing access token parameter.". It's
    # not helpful, so here we raise a new error to ask the user to run with
    # --log-http to view the error response.
    e = rfc6749_errors.MissingTokenError(
        description=(
            'Token is not returned from the token endpoint. Re-run the command'
            ' with --log-http to view the error response.'
        )
    )
    raise six.raise_from(AuthRequestFailedError(e), e)
  except ValueError as e:
    raise six.raise_from(AuthRequestFailedError(e), e)
  except rfc6749_errors.OAuth2Error as e:
    raise six.raise_from(AuthRequestFailedError(e), e)


class WSGIServer(wsgiref.simple_server.WSGIServer):
  """WSGI server to handle more than one connections.

  A normal WSGI server will handle connections one-by-one. When running a local
  server to handle auth redirects, browser opens two connections. One connection
  is used to send the authorization code. The other one is opened but not used.
  Some browsers (i.e. Chrome) send data in the first connection. Other browsers
  (i.e. Safari) send data in the second connection. To make the server working
  for all these browsers, the server should be able to handle two connections
  and smartly read data from the correct connection.
  """

  # pylint: disable=invalid-name, follow the style of the base class.
  def _conn_closed(self, conn):
    """Check if conn is closed at the client side."""
    return not conn.recv(1024, socket.MSG_PEEK)

  def _handle_closed_conn(self, closed_socket, sockets_to_read,
                          client_connections):
    sockets_to_read.remove(closed_socket)
    client_connections[:] = [
        conn for conn in client_connections if conn[0] is not closed_socket
    ]
    self.shutdown_request(closed_socket)

  def _handle_new_client(self, listening_socket, socket_to_read,
                         client_connections):
    request, client_address = listening_socket.accept()
    client_connections.append((request, client_address))
    socket_to_read.append(request)

  def _handle_non_data_conn(self, data_conn, client_connections):
    for request, _ in client_connections:
      if request is not data_conn:
        self.shutdown_request(request)

  def _find_data_conn_with_client_address(self, data_conn, client_connections):
    for request, client_address in client_connections:
      if request is data_conn:
        return request, client_address

  def _find_data_conn(self):
    """Finds the connection which will be used to send data."""
    sockets_to_read = [self.socket]
    client_connections = []
    while True:
      sockets_ready_to_read, _, _ = select.select(sockets_to_read, [], [])
      for s in sockets_ready_to_read:
        # Listening socket is ready to accept client.
        if s is self.socket:
          self._handle_new_client(s, sockets_to_read, client_connections)
        else:
          if self._conn_closed(s):
            self._handle_closed_conn(s, sockets_to_read, client_connections)
          # Found the connection which will be used to send data.
          else:
            self._handle_non_data_conn(s, client_connections)
            return self._find_data_conn_with_client_address(
                s, client_connections)

  # pylint: enable=invalid-name

  def handle_request(self):
    """Handle one request."""
    request, client_address = self._find_data_conn()
    # The following section largely copies the
    # socketserver.BaseSever._handle_request_noblock.
    if self.verify_request(request, client_address):
      try:
        self.process_request(request, client_address)
      except Exception:  # pylint: disable=broad-except
        self.handle_error(request, client_address)
        self.shutdown_request(request)
      except:
        self.shutdown_request(request)
        raise
    else:
      self.shutdown_request(request)


_LOCALHOST = 'localhost'


class InstalledAppFlow(
    six.with_metaclass(abc.ABCMeta, google_auth_flow.InstalledAppFlow)):
  """Base class of authorization flow for installed app.

  Attributes:
    oauth2session: requests_oauthlib.OAuth2Session, The OAuth 2.0 session from
      requests_oauthlib.
    client_type: str, The client type, either "web" or "installed".
    client_config: The client configuration in the Google client secrets format.
    autogenerate_code_verifier: bool, If true, auto-generate a code verifier.
    require_local_server: bool, True if this flow needs a local server to handle
      redirect.
  """

  def __init__(self,
               oauth2session,
               client_type,
               client_config,
               redirect_uri=None,
               code_verifier=None,
               autogenerate_code_verifier=False,
               require_local_server=False):
    session = requests.GetSession(session=oauth2session)
    super(InstalledAppFlow, self).__init__(
        session,
        client_type,
        client_config,
        redirect_uri=redirect_uri,
        code_verifier=code_verifier,
        autogenerate_code_verifier=autogenerate_code_verifier)
    self.original_client_config = client_config
    if require_local_server:
      self.host = _LOCALHOST
      self.app = _RedirectWSGIApp()
      self.server = CreateLocalServer(self.app, self.host, _PORT_SEARCH_START,
                                      _PORT_SEARCH_END)
      self.redirect_uri = 'http://{}:{}/'.format(self.host,
                                                 self.server.server_port)
    elif redirect_uri:
      self.redirect_uri = redirect_uri
    else:
      self.redirect_uri = self._OOB_REDIRECT_URI
    # include_client_id should be set to True for 1P, and False for 3P.
    self.include_client_id = self.client_config.get('3pi') is None

  def Run(self, **kwargs):
    with HandleOauth2FlowErrors():
      return self._Run(**kwargs)

  @abc.abstractmethod
  def _Run(self, **kwargs):
    pass

  @property
  def _for_adc(self):
    """If the flow is for application default credentials."""
    return (
        self.client_config.get('is_adc')
        or self.client_config.get('client_id') != config.CLOUDSDK_CLIENT_ID
    )

  @property
  def _target_command(self):
    if self._for_adc:
      return 'gcloud auth application-default login'
    else:
      return 'gcloud auth login'

  @classmethod
  def FromInstalledAppFlow(cls, source_flow):
    """Creates an instance of the current flow from an existing flow."""
    return cls.from_client_config(
        source_flow.original_client_config,
        source_flow.oauth2session.scope,
        autogenerate_code_verifier=source_flow.autogenerate_code_verifier)


class FullWebFlow(InstalledAppFlow):
  """The complete OAuth 2.0 authorization flow.

  This class supports user account login using "gcloud auth login" with browser.
  Specifically, it does the following:
    1. Try to find an available port for the local server which handles the
       redirect.
    2. Create a WSGI app on the local server which can direct browser to
       Google's confirmation pages for authentication.
  """

  def __init__(self,
               oauth2session,
               client_type,
               client_config,
               redirect_uri=None,
               code_verifier=None,
               autogenerate_code_verifier=False):
    super(FullWebFlow, self).__init__(
        oauth2session,
        client_type,
        client_config,
        redirect_uri=redirect_uri,
        code_verifier=code_verifier,
        autogenerate_code_verifier=autogenerate_code_verifier,
        require_local_server=True)

  def _Run(self, **kwargs):
    """Run the flow using the server strategy.

    The server strategy instructs the user to open the authorization URL in
    their browser and will attempt to automatically open the URL for them.
    It will start a local web server to listen for the authorization
    response. Once authorization is complete the authorization server will
    redirect the user's browser to the local web server. The web server
    will get the authorization code from the response and shutdown. The
    code is then exchanged for a token.

    Args:
        **kwargs: Additional keyword arguments passed through to
          "authorization_url".

    Returns:
        google.oauth2.credentials.Credentials: The OAuth 2.0 credentials
          for the user.

    Raises:
      LocalServerTimeoutError: If the local server handling redirection timeout
        before receiving the request.
      AuthRequestFailedError: If the user did not consent to the required
        cloud-platform scope.
    """
    auth_url, _ = self.authorization_url(**kwargs)

    webbrowser.open(auth_url, new=1, autoraise=True)

    authorization_prompt_message = (
        'Your browser has been opened to visit:\n\n    {url}\n')
    log.err.Print(authorization_prompt_message.format(url=auth_url))
    self.server.handle_request()
    self.server.server_close()

    if not self.app.last_request_uri:
      raise LocalServerTimeoutError(
          'Local server timed out before receiving the redirection request.')
    # Note: using https here because oauthlib requires that
    # OAuth 2.0 should only occur over https.
    authorization_response = self.app.last_request_uri.replace(
        'http:', 'https:')

    # TODO(b/204953716): Remove verify=None
    os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1'
    self.fetch_token(
        authorization_response=authorization_response,
        include_client_id=self.include_client_id,
        verify=None,
    )
    del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']
    self._CheckScopes()
    return self.credentials

  def _CheckScopes(self):
    """Checks requested scopes and granted scopes."""
    orig_scope = list(self.oauth2session.scope)
    granted_scope = self.oauth2session.token.scope.split(' ')
    missing_scope = frozenset(orig_scope) - frozenset(granted_scope)

    if 'https://www.googleapis.com/auth/cloud-platform' in missing_scope:
      raise AuthRequestFailedError(
          'https://www.googleapis.com/auth/cloud-platform scope is required but'
          ' not consented. Please run the login command again and consent in'
          ' the login page.'
      )

    if missing_scope:
      log.status.write(
          'You have consented to only few of the requested scopes, so'
          ' some features may not work as expected. If you would like to give'
          ' consent to all scopes, you can run the login command again.'
          f' Requested scopes: {orig_scope}.\nScopes you consented for:'
          f' {granted_scope}.\nMissing scopes: {list(missing_scope)}.'
      )
      # self.credentials' scope comes from self.oauth2session.scope, so here we
      # update the oauth2session scope to the granted scope, so self.credentials
      # will have the correct scope.
      self.oauth2session.scope = granted_scope


# TODO(b/206804357): Remove OOB flow from gcloud.
class OobFlow(InstalledAppFlow):
  """Out-of-band flow.

  This class supports user account login using "gcloud auth login" without
  browser.
  """

  def __init__(self,
               oauth2session,
               client_type,
               client_config,
               redirect_uri=None,
               code_verifier=None,
               autogenerate_code_verifier=False):
    super(OobFlow, self).__init__(
        oauth2session,
        client_type,
        client_config,
        redirect_uri=redirect_uri,
        code_verifier=code_verifier,
        autogenerate_code_verifier=autogenerate_code_verifier,
        require_local_server=False)

  def _Run(self, **kwargs):
    """Run the flow using the console strategy.

    The console strategy instructs the user to open the authorization URL
    in their browser. Once the authorization is complete the authorization
    server will give the user a code. The user then must copy & paste this
    code into the application. The code is then exchanged for a token.

    Args:
        **kwargs: Additional keyword arguments passed through to
          "authorization_url".

    Returns:
        google.oauth2.credentials.Credentials: The OAuth 2.0 credentials
          for the user.
    """
    kwargs.setdefault('prompt', 'consent')
    auth_url, _ = self.authorization_url(**kwargs)

    authorization_prompt_message = (
        'Go to the following link in your browser:\n\n    {url}\n')
    code = PromptForAuthCode(authorization_prompt_message, auth_url)
    # TODO(b/204953716): Remove verify=None
    self.fetch_token(code=code, include_client_id=True, verify=None)

    return self.credentials


class UrlManager(object):
  """A helper for url manipulation."""

  def __init__(self, url):
    self._parse_url = parse.urlparse(url)
    self._scheme, self._netloc, self._path, self._query = (
        self._parse_url.scheme, self._parse_url.netloc, self._parse_url.path,
        self._parse_url.query)
    self._parsed_query = parse.parse_qsl(self._query)

  def UpdateQueryParams(self, query_params):
    """Updates query params in the url using query_params.

    Args:
       query_params: A list of two-element tuples. The first element in the
         tuple is the query key and the second element is the query value.
    """
    for key, value in query_params:
      self._RemoveQueryParam(key)
      self._parsed_query.append((key, value))

  def RemoveQueryParams(self, query_keys):
    """Removes query params from the url.

    Args:
      query_keys: A list of query keys to remove.
    """
    for p in query_keys:
      self._RemoveQueryParam(p)

  def _RemoveQueryParam(self, query_key):
    self._parsed_query[:] = [p for p in self._parsed_query if p[0] != query_key]

  def ContainQueryParams(self, query_keys):
    """If the url contains the query keys in query_key.

    Args:
      query_keys: A list of query keys to check in the url.

    Returns:
      True if all query keys in query_keys are contained in url. Otherwise,
        return False.
    """
    parsed_query_keys = {k for (k, v) in self._parsed_query}
    return all([p in parsed_query_keys for p in query_keys])

  def GetQueryParam(self, query_key):
    """Gets the value of the query_key.

    Args:
       query_key: str, A query key to get the value for.

    Returns:
      The value of the query_key. None if query_key does not exist in the url.
    """
    for k, v in self._parsed_query:
      if query_key == k:
        return v

  def GetUrl(self):
    """Gets the current url in the string format."""
    encoded_query = parse.urlencode(self._parsed_query)
    return parse.urlunparse(
        (self._scheme, self._netloc, self._path, '', encoded_query, ''))

  def GetPort(self):
    try:
      _, port = self._netloc.rsplit(':', 1)
      return int(port)
    except ValueError:
      return None


_REQUIRED_QUERY_PARAMS_IN_AUTH_RESPONSE = ('state', 'code')

_AUTH_RESPONSE_ERR_MSG = (
    'The provided authorization response is invalid. Expect a url '
    'with query parameters of [{}].'.format(
        ', '.join(_REQUIRED_QUERY_PARAMS_IN_AUTH_RESPONSE)))


def _ValidateAuthResponse(auth_response):
  if UrlManager(auth_response).ContainQueryParams(
      _REQUIRED_QUERY_PARAMS_IN_AUTH_RESPONSE):
    return
  raise AuthRequestFailedError(_AUTH_RESPONSE_ERR_MSG)


def PromptForAuthResponse(helper_msg, prompt_msg, client_config=None):
  ImportReadline(client_config)
  log.err.Print(helper_msg)
  log.err.Print('\n')
  return input(prompt_msg).strip()


def ImportReadline(client_config):
  if (
      client_config is not None
      and '3pi' in client_config
      and (sys.platform.startswith('dar') or sys.platform.startswith('linux'))
  ):
    # Importing readline alters the built-in input() method
    # to use the GNU readline interface.
    # The basic OSX input() has an input limit of 1024 characters,
    # which is sometimes not enough for us.
    import readline  # pylint: disable=unused-import, g-import-not-at-top


class NoBrowserFlow(InstalledAppFlow):
  """Flow to authorize gcloud on a machine without access to web browsers.

  Out-of-band flow (OobFlow) is deprecated. This flow together with the helper
  flow NoBrowserHelperFlow is the replacement. gcloud in
  environments without access to browsers (i.e. access via ssh) can use this
  flow to authorize gcloud. This flow will print authorization parameters
  which will be taken by the helper flow to build the final authorization
  request. The helper flow (run by a gcloud instance
  with access to browsers) will launch the browser and ask for user's
  authorization. After the authorization, the helper flow will print the
  authorization response to pass back to this flow to continue the process
  (exchanging for the refresh/access tokens).
  """

   # These _REQUIRED_VERSIONs are used in the --no-browser flows, which are
   # used when interacting with the CLI and using a different machine.
   # That other machine's version might be out of date.
  _REQUIRED_GCLOUD_VERSION_FOR_TPC = '506.0.0'
  _REQUIRED_GCLOUD_VERSION_FOR_BYOID = '420.0.0'
  _REQUIRED_GCLOUD_VERSION = '372.0.0'
  _HELPER_MSG = ('You are authorizing {target} without access to a web '
                 'browser. Please run the following command on a machine with '
                 'a web browser and copy its output back here. Make sure the '
                 'installed gcloud version is {version} or newer.\n\n'
                 '{command} --remote-bootstrap="{partial_url}"')
  _PROMPT_MSG = 'Enter the output of the above command: '

  def __init__(self,
               oauth2session,
               client_type,
               client_config,
               redirect_uri=None,
               code_verifier=None,
               autogenerate_code_verifier=False):
    super(NoBrowserFlow, self).__init__(
        oauth2session,
        client_type,
        client_config,
        redirect_uri=redirect_uri,
        code_verifier=code_verifier,
        autogenerate_code_verifier=autogenerate_code_verifier,
        require_local_server=False)

  def _PromptForAuthResponse(self, partial_url):
    if not self._for_adc:
      target = 'gcloud CLI'
      command = 'gcloud auth login'
    else:
      target = 'client libraries'
      command = 'gcloud auth application-default login'

    universe_domain_property = properties.VALUES.core.universe_domain
    if (
        universe_domain_property is not None
        and properties.VALUES.core.universe_domain.Get()
        != universe_domain_property.default
    ):
      required_gcloud_version = self._REQUIRED_GCLOUD_VERSION_FOR_TPC
    elif self.client_config.get('3pi'):
      required_gcloud_version = self._REQUIRED_GCLOUD_VERSION_FOR_BYOID
    else:
      required_gcloud_version = self._REQUIRED_GCLOUD_VERSION

    helper_msg = self._HELPER_MSG.format(
        target=target,
        version=required_gcloud_version,
        command=command,
        partial_url=partial_url,
    )

    return PromptForAuthResponse(
        helper_msg, self._PROMPT_MSG, self.client_config
    )

  def _Run(self, **kwargs):
    auth_url, _ = self.authorization_url(**kwargs)
    url_manager = UrlManager(auth_url)
    # redirect_uri needs to be provided by the helper flow because the helper
    # will dynamically select a port on its localhost to handle redirect.
    url_manager.RemoveQueryParams(['redirect_uri'])
    # token_usage=remote is to indicate that the authorization is to bootstrap a
    # a different gcloud instance.
    url_manager.UpdateQueryParams([('token_usage', 'remote')])
    auth_response = self._PromptForAuthResponse(url_manager.GetUrl())
    _ValidateAuthResponse(auth_response)
    redirect_port = UrlManager(auth_response).GetPort()
    # Even though we started the local service using "localhost" as host name,
    # system may use a different name. So, the host name in the auth
    # response may not be "localhost". However, we should ignore it and keep
    # using "localhost" as the redirect_uri in token exchange because it is
    # what was used during authorization.
    self.redirect_uri = 'http://{}:{}/'.format(_LOCALHOST, redirect_port)

    # include_client_id should be set to True for 1P, and False for 3P.
    include_client_id = self.client_config.get('3pi') is None
    # TODO(b/204953716): Remove verify=None
    self.fetch_token(
        authorization_response=auth_response,
        include_client_id=include_client_id,
        verify=None,
    )
    return self.credentials


class NoBrowserHelperFlow(InstalledAppFlow):
  """Helper flow for the NoBrowserFlow to help another gcloud to authorize.

  This flow takes the authorization parameters (i.e. requested scopes) generated
  by the NoBrowserFlow and launches the browser for users to authorize.
  After users authorize, print the authorization response which will be taken
  by NoBrowserFlow to continue the login process
  (exchanging for refresh/access token).
  """

  _COPY_AUTH_RESPONSE_INSTRUCTION = (
      'Copy the following line back to the gcloud CLI waiting to continue '
      'the login flow.')
  _COPY_AUTH_RESPONSE_WARNING = (
      '{bold}WARNING: The following line enables access to your Google Cloud '
      'resources. Only copy it to the trusted machine that you ran the '
      '`{command} --no-browser` command on earlier.{normal}')
  _PROMPT_TO_CONTINUE_MSG = (
      'DO NOT PROCEED UNLESS YOU ARE BOOTSTRAPPING GCLOUD '
      'ON A TRUSTED MACHINE WITHOUT A WEB BROWSER AND THE ABOVE COMMAND WAS '
      'THE OUTPUT OF `{command} --no-browser` FROM THE TRUSTED MACHINE.')

  def __init__(self,
               oauth2session,
               client_type,
               client_config,
               redirect_uri=None,
               code_verifier=None,
               autogenerate_code_verifier=False):
    super(NoBrowserHelperFlow, self).__init__(
        oauth2session,
        client_type,
        client_config,
        redirect_uri=redirect_uri,
        code_verifier=code_verifier,
        autogenerate_code_verifier=autogenerate_code_verifier,
        require_local_server=True)
    self.partial_auth_url = None

  @property
  def _for_adc(self):
    client_id = UrlManager(self.partial_auth_url).GetQueryParam('client_id')
    return client_id != config.CLOUDSDK_CLIENT_ID

  def _PrintCopyInstruction(self, auth_response):
    con = console_attr.GetConsoleAttr()

    log.status.write(self._COPY_AUTH_RESPONSE_INSTRUCTION + ' ')
    log.status.Print(
        self._COPY_AUTH_RESPONSE_WARNING.format(
            bold=con.GetFontCode(bold=True),
            command=self._target_command,
            normal=con.GetFontCode()))
    log.status.write('\n')
    log.status.Print(auth_response)

  def _ShouldContinue(self):
    """Ask users to confirm before actually running the flow."""
    return console_io.PromptContinue(
        self._PROMPT_TO_CONTINUE_MSG.format(command=self._target_command),
        prompt_string='Proceed',
        default=False)

  def _Run(self, **kwargs):
    self.partial_auth_url = kwargs.pop('partial_auth_url')
    auth_url_manager = UrlManager(self.partial_auth_url)
    auth_url_manager.UpdateQueryParams([('redirect_uri', self.redirect_uri)] +
                                       list(kwargs.items()))
    auth_url = auth_url_manager.GetUrl()
    if not self._ShouldContinue():
      return
    webbrowser.open(auth_url, new=1, autoraise=True)

    authorization_prompt_message = (
        'Your browser has been opened to visit:\n\n    {url}\n')
    log.err.Print(authorization_prompt_message.format(url=auth_url))
    self.server.handle_request()
    self.server.server_close()

    if not self.app.last_request_uri:
      raise LocalServerTimeoutError(
          'Local server timed out before receiving the redirection request.')
    # Note: using https here because oauthlib requires that
    # OAuth 2.0 should only occur over https.
    authorization_response = self.app.last_request_uri.replace(
        'http:', 'https:')
    self._PrintCopyInstruction(authorization_response)


class RemoteLoginWithAuthProxyFlow(InstalledAppFlow):
  """Flow to authorize gcloud on a machine without access to web browsers.

  Out-of-band flow (OobFlow) is deprecated. gcloud in
  environments without access to browsers (eg. access via ssh) can use this
  flow to authorize gcloud. This flow will print a url which the user has to
  copy to a browser in any machine and perform authorization. After the
  authorization, the user is redirected to gcloud's auth proxy which displays
  the auth code. User copies the auth code back to gcloud to continue the
  process (exchanging auth code for the refresh/access tokens).
  """

  def __init__(self,
               oauth2session,
               client_type,
               client_config,
               redirect_uri=None,
               code_verifier=None,
               autogenerate_code_verifier=False):
    super(RemoteLoginWithAuthProxyFlow, self).__init__(
        oauth2session,
        client_type,
        client_config,
        redirect_uri=redirect_uri,
        code_verifier=code_verifier,
        autogenerate_code_verifier=autogenerate_code_verifier,
        require_local_server=False)

  def _Run(self, **kwargs):
    """Run the flow using the console strategy.

    The console strategy instructs the user to open the authorization URL
    in their browser. Once the authorization is complete the authorization
    server will give the user a code. The user then must copy & paste this
    code into the application. The code is then exchanged for a token.

    Args:
        **kwargs: Additional keyword arguments passed through to
          "authorization_url".

    Returns:
        google.oauth2.credentials.Credentials: The OAuth 2.0 credentials
          for the user.
    """

    kwargs.setdefault('prompt', 'consent')
    # when the parameter token_usage=remote is present, the DUSI of the token is
    # not attached to the local device whose browser is used to provide consent.
    kwargs.setdefault('token_usage', 'remote')
    auth_url, _ = self.authorization_url(**kwargs)

    authorization_prompt_message = (
        'Go to the following link in your browser, and complete the sign-in'
        ' prompts:\n\n    {url}\n'
    )

    code = PromptForAuthCode(
        authorization_prompt_message, auth_url, self.client_config
    )

    # TODO(b/204953716): Remove verify=None
    self.fetch_token(
        code=code, include_client_id=self.include_client_id, verify=None
    )

    return self.credentials


def CreateLocalServer(wsgi_app, host, search_start_port, search_end_port):
  """Creates a local wsgi server.

  Finds an available port in the range of [search_start_port, search_end_point)
  for the local server.

  Args:
    wsgi_app: A wsgi app running on the local server.
    host: hostname of the server.
    search_start_port: int, the port where the search starts.
    search_end_port: int, the port where the search ends.

  Raises:
    LocalServerCreationError: If it cannot find an available port for
      the local server.

  Returns:
    WSGISever, a wsgi server.
  """
  port = search_start_port
  local_server = None
  while not local_server and port < search_end_port:
    try:
      local_server = wsgiref.simple_server.make_server(
          host,
          port,
          wsgi_app,
          server_class=WSGIServer,
          handler_class=google_auth_flow._WSGIRequestHandler)  # pylint:disable=protected-access
    except (socket.error, OSError):
      port += 1
  if local_server:
    return local_server
  raise LocalServerCreationError(
      _PORT_SEARCH_ERROR_MSG.format(
          start_port=search_start_port, end_port=search_end_port - 1))


class _RedirectWSGIApp(object):
  """WSGI app to handle the authorization redirect.

  Stores the request URI and responds with a confirmation page.
  """

  def __init__(self):
    self.last_request_uri = None

  def __call__(self, environ, start_response):
    """WSGI Callable.

    Args:
        environ (Mapping[str, Any]): The WSGI environment.
        start_response (Callable[str, list]): The WSGI start_response callable.

    Returns:
        Iterable[bytes]: The response body.
    """
    start_response(
        six.ensure_str('200 OK'),
        [(six.ensure_str('Content-type'), six.ensure_str('text/html'))])
    self.last_request_uri = wsgiref.util.request_uri(environ)
    query = self.last_request_uri.split('?', 1)[-1]
    query = dict(parse.parse_qsl(query))

    if 'code' in query:
      page = 'oauth2_landing.html'
    else:
      page = 'oauth2_landing_error.html'

    page_string = pkg_resources.GetResource(__name__, page)
    if not properties.IsDefaultUniverse():
      # decode and replace cloud.google.com with the universe document domain
      page_string = bytes(
          bytes(page_string)
          .decode('utf-8')
          .replace(
              'cloud.google.com',
              universe_descriptor.GetUniverseDocumentDomain(),
          ),
          'utf-8',
      )
    return [page_string]