HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/util/ssh/ssh.py
# -*- coding: utf-8 -*- #
# Copyright 2016 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""SSH client utilities for key-generation, dispatching the ssh commands etc."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import datetime
import enum
import errno
import getpass
import os
import re
import string
import subprocess
import tempfile
import textwrap

from apitools.base.py import exceptions
from googlecloudsdk.api_lib.oslogin import client as oslogin_client
from googlecloudsdk.calliope import base
from googlecloudsdk.command_lib.oslogin import oslogin_utils
from googlecloudsdk.command_lib.util import gaia
from googlecloudsdk.core import config
from googlecloudsdk.core import exceptions as core_exceptions
from googlecloudsdk.core import execution_utils
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.console import console_io
from googlecloudsdk.core.credentials import creds as c_creds
from googlecloudsdk.core.credentials import exceptions as creds_exceptions
from googlecloudsdk.core.credentials import store as c_store
from googlecloudsdk.core.util import files
from googlecloudsdk.core.util import platforms
from googlecloudsdk.core.util import retry
import six
from six.moves.urllib.parse import quote


PER_USER_SSH_CONFIG_FILE = os.path.join('~', '.ssh', 'config')
SECURITY_KEY_DIR = os.path.join('~', '.ssh', 'google_compute_engine_sk')
CERTIFICATE_DIR = os.path.join('~', '.ssh', 'google_compute_engine_cert')
OSLOGIN_ENABLE_METADATA_KEY = 'enable-oslogin'
OSLOGIN_ENABLE_2FA_METADATA_KEY = 'enable-oslogin-2fa'
OSLOGIN_ENABLE_SK_METADATA_KEY = 'enable-oslogin-sk'
OSLOGIN_ENABLE_CERTIFICATES_METADATA_KEY = 'enable-oslogin-certificates'


class InvalidKeyError(core_exceptions.Error):
  """Indicates a key file was not found."""


class MissingCommandError(core_exceptions.Error):
  """Indicates that an external executable couldn't be found."""


class CommandError(core_exceptions.Error):
  """Raise for a failure when invoking ssh, scp, or similar."""

  def __init__(self, cmd, message=None, return_code=None):
    if not (message or return_code):
      raise ValueError('One of message or return_code is required.')

    self.cmd = cmd

    message_text = '[{0}]'.format(message) if message else None
    return_code_text = (
        'return code [{0}]'.format(return_code) if return_code else None
    )
    why_failed = ' and '.join(
        [f for f in [message_text, return_code_text] if f]
    )

    super(CommandError, self).__init__(
        '[{0}] exited with {1}.'.format(self.cmd, why_failed),
        exit_code=return_code,
    )


class InvalidConfigurationError(core_exceptions.Error):
  """When arguments provided have misconfigured sources/destinations."""

  def __init__(self, msg, sources, destination):
    super(InvalidConfigurationError, self).__init__(
        msg + '  Got sources: {}, destination: {}'.format(sources, destination)
    )


class BadCharacterError(core_exceptions.Error):
  """Indicates a character was found that couldn't be escaped."""


class Suite(enum.Enum):
  """Represents an SSH implementation suite."""

  OPENSSH = 'OpenSSH'
  PUTTY = 'PuTTY'


class Environment(object):
  """Environment maps SSH commands to executable location on file system.

    Recommended usage:

    env = Environment.Current()
    env.RequireSSH()
    cmd = [env.ssh, 'user@host']

  An attribute which is None indicates that the executable couldn't be found.

  Attributes:
    suite: Suite, The suite for this environment.
    bin_path: str, The path where the commands are located. If None, use
      standard `$PATH`.
    ssh: str, Location of ssh command (or None if not found).
    ssh_term: str, Location of ssh terminal command (or None if not found), for
      interactive sessions.
    scp: str, Location of scp command (or None if not found).
    keygen: str, Location of the keygen command (or None if not found).
    ssh_exit_code: int, Exit code indicating SSH command failure.
  """

  # Each suite supports ssh and non-interactive ssh, scp and keygen.
  COMMANDS = {
      Suite.OPENSSH: {
          'ssh': 'ssh',
          'ssh_term': 'ssh',
          'scp': 'scp',
          'keygen': 'ssh-keygen',
      },
      Suite.PUTTY: {
          'ssh': 'plink',
          'ssh_term': 'putty',
          'scp': 'pscp',
          'keygen': 'winkeygen',
      },
  }

  # Exit codes indicating that the `ssh` command (not remote) failed
  SSH_EXIT_CODES = {
      Suite.OPENSSH: 255,
      Suite.PUTTY: 1,  # Only `plink`, `putty` always gives 0
  }

  def __init__(self, suite, bin_path=None):
    """Create a new environment by supplying a suite and command directory.

    Args:
      suite: Suite, the suite for this environment.
      bin_path: str, the path where the commands are located. If None, use
        standard $PATH.
    """
    self.suite = suite
    self.bin_path = bin_path
    self.ssh = None
    self.ssh_term = None
    self.scp = None
    self.keygen = None
    for key, cmd in six.iteritems(self.COMMANDS[suite]):
      setattr(self, key, files.FindExecutableOnPath(cmd, path=self.bin_path))
    self.ssh_exit_code = self.SSH_EXIT_CODES[suite]

  def SupportsSSH(self):
    """Whether all SSH commands are supported.

    Returns:
      True if and only if all commands are supported, else False.
    """
    return all((self.ssh, self.ssh_term, self.scp, self.keygen))

  def RequireSSH(self):
    """Simply raises an error if any SSH command is not supported.

    Raises:
      MissingCommandError: One or more of the commands were not found.
    """
    if not self.SupportsSSH():
      raise MissingCommandError('Your platform does not support SSH.')

  @classmethod
  def Current(cls):
    """Retrieve the current environment.

    Returns:
      Environment, the active and current environment on this machine.
    """
    if platforms.OperatingSystem.IsWindows():
      suite = Suite.PUTTY
      bin_path = _SdkHelperBin()
    else:
      suite = Suite.OPENSSH
      bin_path = None
    return Environment(suite, bin_path)


def _IsValidSshUsername(user):
  # All characters must be ASCII, and no spaces are allowed
  # This may grant false positives, but will prevent backwards-incompatible
  # behavior.
  return all(ord(c) < 128 and c != ' ' for c in user)


class KeyFileStatus(enum.Enum):
  PRESENT = 'OK'
  ABSENT = 'NOT FOUND'
  BROKEN = 'BROKEN'


class _KeyFileKind(enum.Enum):
  """List of supported (by gcloud) key file kinds."""

  PRIVATE = 'private'
  PUBLIC = 'public'
  PPK = 'PuTTY PPK'


class Keys(object):
  """Manages private and public SSH key files.

  This class manages the SSH public and private key files, and verifies
  correctness of them. A Keys object is instantiated with a path to a
  private key file. The public key locations are inferred by the private
  key file by simply appending a different file ending (`.pub` and `.ppk`).

  If the keys are broken or do not yet exist, the EnsureKeysExist method
  can be utilized to shell out to the system SSH keygen and write new key
  files.

  By default, there is an SSH key for the gcloud installation,
  `DEFAULT_KEY_FILE` which should likely be used. Note that SSH keys are
  generated and managed on a per-installation basis. Strictly speaking,
  there is no 1:1 relationship between installation and user account.

  Verifies correctness of key files:
   - Populates list of SSH key files (key pair, ppk key on Windows).
   - Checks if files are present and (to basic extent) correct.
   - Can remove broken key (if permitted by user).
   - Provides status information.
  """

  DEFAULT_KEY_FILE = os.path.join('~', '.ssh', 'google_compute_engine')

  class PublicKey(object):
    """Represents a public key.

    Attributes:
      key_type: str, Key generation type, e.g. `ssh-rsa` or `ssh-dss`.
      key_data: str, Base64-encoded key data.
      comment: str, Non-semantic comment, may be empty string or contain spaces.
    """

    def __init__(self, key_type, key_data, comment=''):
      self.key_type = key_type
      self.key_data = key_data
      self.comment = comment

    @classmethod
    def FromKeyString(cls, key_string):
      """Construct a public key from a typical OpenSSH-style key string.

      Args:
        key_string: str, on the format `TYPE DATA [COMMENT]`. Example: `ssh-rsa
          ABCDEF me@host.com`.

      Raises:
        InvalidKeyError: The public key file does not contain key (heuristic).

      Returns:
        Keys.PublicKey, the parsed public key.
      """
      # We get back a unicode list of keys for the remaining metadata, so
      # convert to unicode. Assume UTF 8, but if we miss a character we can just
      # replace it with a '?'. The only source of issues would be the hostnames,
      # which are relatively inconsequential.
      decoded_key = key_string.strip()
      if isinstance(key_string, six.binary_type):
        decoded_key = decoded_key.decode('utf-8', 'replace')
      parts = decoded_key.split(' ', 2)
      if len(parts) < 2:
        raise InvalidKeyError('Public key [{}] is invalid.'.format(key_string))
      comment = parts[2].strip() if len(parts) > 2 else ''  # e.g. `me@host`
      return cls(parts[0], parts[1], comment)

    def ToEntry(self, include_comment=False):
      """Format this key into a text entry.

      Args:
        include_comment: str, Include the comment part in this entry.

      Returns:
        str, A key string on the form `TYPE DATA` or `TYPE DATA COMMENT`.
      """
      out_format = '{type} {data}'
      if include_comment and self.comment:
        out_format += ' {comment}'
      return out_format.format(
          type=self.key_type, data=self.key_data, comment=self.comment
      )

  class KeyFileData(object):

    def __init__(self, filename):
      # We keep filename as file handle. Filesystem race is impossible to avoid
      # in this design as we spawn a subprocess and pass in filename.
      # TODO(b/33288605) fix it.
      self.filename = filename
      self.status = None

  def __init__(self, key_file, env=None):
    """Create a Keys object which manages the given files.

    Args:
      key_file: str, The file path to the private SSH key file (other files are
        derived from this name). Automatically handles symlinks and user
        expansion.
      env: Environment, Current environment or None to infer from current.
    """
    private_key_file = os.path.realpath(files.ExpandHomeDir(key_file))
    self.dir = os.path.dirname(private_key_file)
    self.env = env or Environment.Current()
    self.keys = {
        _KeyFileKind.PRIVATE: self.KeyFileData(private_key_file),
        _KeyFileKind.PUBLIC: self.KeyFileData(private_key_file + '.pub'),
    }
    if self.env.suite is Suite.PUTTY:
      self.keys[_KeyFileKind.PPK] = self.KeyFileData(private_key_file + '.ppk')

  @classmethod
  def FromFilename(cls, filename=None, env=None):
    """Create Keys object given a file name.

    Args:
      filename: str or None, the name to the file or DEFAULT_KEY_FILE if None
      env: Environment, Current environment or None to infer from current.

    Returns:
      Keys, an instance which manages the keys with the given name.
    """
    return cls(filename or Keys.DEFAULT_KEY_FILE, env)

  @property
  def key_file(self):
    """Filename of the private key file."""
    return self.keys[_KeyFileKind.PRIVATE].filename

  def _StatusMessage(self):
    """Prepares human readable SSH key status information."""
    messages = []
    key_padding = 0
    status_padding = 0
    for kind in self.keys:
      data = self.keys[kind]
      key_padding = max(key_padding, len(kind.value))
      status_padding = max(status_padding, len(data.status.value))
    for kind in self.keys:
      data = self.keys[kind]
      messages.append(
          '{} {} [{}]\n'.format(
              (kind.value + ' key').ljust(key_padding + 4),
              ('(' + data.status.value + ')').ljust(status_padding + 2),
              data.filename,
          )
      )
    messages.sort()
    return ''.join(messages)

  def Validate(self):
    """Performs minimum key files validation.

    Note that this is a simple best-effort parser intended for machine
    generated keys. If the file has been user modified, there's a risk
    of both false positives and false negatives.

    Returns:
      KeyFileStatus.PRESENT if key files meet minimum requirements.
      KeyFileStatus.ABSENT if neither private nor public keys exist.
      KeyFileStatus.BROKEN if there is some key, but it is broken or incomplete.
    """

    def ValidateFile(kind):
      status_or_line = self._WarnOrReadFirstKeyLine(
          self.keys[kind].filename, kind.value
      )
      if isinstance(status_or_line, KeyFileStatus):
        return status_or_line
      else:  # returned line - present
        self.keys[kind].first_line = status_or_line
        return KeyFileStatus.PRESENT

    for file_kind in self.keys:
      self.keys[file_kind].status = ValidateFile(file_kind)

    # The remaining checks are for the public key file.

    # Additional validation for public keys.
    if self.keys[_KeyFileKind.PUBLIC].status is KeyFileStatus.PRESENT:
      try:
        self.GetPublicKey()
      except InvalidKeyError:
        log.warning(
            'The public SSH key file [{}] is corrupt.'.format(
                self.keys[_KeyFileKind.PUBLIC]
            )
        )
        self.keys[_KeyFileKind.PUBLIC].status = KeyFileStatus.BROKEN

    # Summary
    collected_values = [x.status for x in six.itervalues(self.keys)]
    if all(x == KeyFileStatus.ABSENT for x in collected_values):
      return KeyFileStatus.ABSENT
    elif all(x == KeyFileStatus.PRESENT for x in collected_values):
      return KeyFileStatus.PRESENT
    else:
      return KeyFileStatus.BROKEN

  def RemoveKeyFilesIfPermittedOrFail(self, force_key_file_overwrite=None):
    """Removes all SSH key files if user permitted this behavior.

    Precondition: The SSH key files are currently in a broken state.

    Depending on `force_key_file_overwrite`, delete all SSH key files:

    - If True, delete key files.
    - If False, cancel immediately.
    - If None and
      - interactive, prompt the user.
      - non-interactive, cancel.

    Args:
      force_key_file_overwrite: bool or None, overwrite broken key files.

    Raises:
      console_io.OperationCancelledError: Operation intentionally cancelled.
      OSError: Error deleting the broken file(s).
    """
    message = 'Your SSH key files are broken.\n' + self._StatusMessage()
    if force_key_file_overwrite is False:
      raise console_io.OperationCancelledError(message + 'Operation aborted.')
    message += 'We are going to overwrite all above files.'
    log.warning(message)
    if force_key_file_overwrite is None:
      # - Interactive when pressing 'Y', continue
      # - Interactive when pressing enter or 'N', raise OperationCancelledError
      # - Non-interactive, raise OperationCancelledError
      console_io.PromptContinue(default=False, cancel_on_no=True)

    # Remove existing broken key files.
    for key_file in six.viewvalues(self.keys):
      try:
        os.remove(key_file.filename)
      except OSError as e:
        if e.errno == errno.EISDIR:
          # key_file.filename points to a directory
          raise

  def _WarnOrReadFirstKeyLine(self, path, kind):
    """Returns the first line from the key file path.

    A None return indicates an error and is always accompanied by a log.warning
    message.

    Args:
      path: The path of the file to read from.
      kind: The kind of key file, 'private' or 'public'.

    Returns:
      None (and prints a log.warning message) if the file does not exist, is not
      readable, or is empty. Otherwise returns the first line utf8 decoded.
    """
    try:
      with files.FileReader(path) as f:
        line = f.readline().strip()
        if line:
          return line
        msg = 'is empty'
        status = KeyFileStatus.BROKEN
    except files.MissingFileError:
      msg = 'does not exist'
      status = KeyFileStatus.ABSENT
    except files.Error:
      msg = 'is not readable'
      status = KeyFileStatus.BROKEN
    log.warning('The %s SSH key file for gcloud %s.', kind, msg)
    return status

  def GetPublicKey(self):
    """Returns the public key verbatim from file as a string.

    Precondition: The public key must exist. Run Keys.EnsureKeysExist() prior.

    Raises:
      InvalidKeyError: If the public key file does not contain key (heuristic).

    Returns:
      Keys.PublicKey, a public key (that passed primitive validation).
    """
    filepath = self.keys[_KeyFileKind.PUBLIC].filename
    with files.FileReader(filepath) as f:
      # TODO(b/33467618): Currently we enforce that key exists on the first
      # line, but OpenSSH does not enforce that.
      first_line = f.readline()
      return self.PublicKey.FromKeyString(first_line)

  def EnsureKeysExist(self, overwrite, allow_passphrase=True):
    """Generate ssh key files if they do not yet exist.

    Precondition: Environment.SupportsSSH()

    Args:
      overwrite: bool or None, overwrite key files if they are broken.
      allow_passphrase: bool, if keygeneration occurs, let the user specify a
        passphrase for private key encryption. See `ssh.KeygenCommand` for
        details on when this is possible.

    Raises:
      console_io.OperationCancelledError: if interrupted by user
      CommandError: if the ssh-keygen command failed.
    """
    key_files_validity = self.Validate()

    if key_files_validity is KeyFileStatus.BROKEN:
      self.RemoveKeyFilesIfPermittedOrFail(overwrite)
      # Fallthrough
    if key_files_validity is not KeyFileStatus.PRESENT:
      if key_files_validity is KeyFileStatus.ABSENT:
        # If key is broken, message is already displayed
        log.warning('You do not have an SSH key for gcloud.')
        log.warning('SSH keygen will be executed to generate a key.')

      if not os.path.exists(self.dir):
        msg = (
            'This tool needs to create the directory [{0}] before being '
            'able to generate SSH keys.'.format(self.dir)
        )
        console_io.PromptContinue(
            message=msg,
            cancel_on_no=True,
            cancel_string='SSH key generation aborted by user.',
        )
        files.MakeDir(self.dir, 0o700)

      cmd = KeygenCommand(self.key_file, allow_passphrase=allow_passphrase)
      cmd.Run(self.env)

    if self.env.suite is Suite.PUTTY:
      # This is to fix an encoding issue with PPK's we generated that was
      # ignored in versions of PuTTY <=0.70, but became invalid in version 0.71.
      # Since this only affects the PPK, we don't need to generate a new key; we
      # can just correct the encoding of the PPK if necessary. We use a sentinel
      # file in the config dir to check if the encoding is already correct.
      valid_ppk_sentinel = config.Paths().valid_ppk_sentinel_file
      if not os.path.exists(valid_ppk_sentinel):
        if key_files_validity is KeyFileStatus.PRESENT:  # Initial validity
          cmd = KeygenCommand(
              self.key_file, allow_passphrase=False, reencode_ppk=True
          )
          cmd.Run(self.env)
        try:
          files.WriteFileContents(valid_ppk_sentinel, '')
        except files.Error as e:
          # It's possible that writing the sentinel file fails, which means
          # we'll potentially have to re-encode the PPK again the next time an
          # SSH/SCP command is run. But we shouldn't let this prevent the user
          # from running their current command.
          log.debug('Failed to create sentinel file: [{}]'.format(e))


def CertFileFromCloudRunDeployment(project, region, deployment):
  cert_dir = os.path.realpath(files.ExpandHomeDir(CERTIFICATE_DIR))
  return os.path.join(
      cert_dir,
      '{}_{}_{}-cert.pub'.format(project, region, deployment),
  )


def CertFileFromAppEngineInstance(project, service, version, instance):
  cert_dir = os.path.realpath(files.ExpandHomeDir(CERTIFICATE_DIR))
  return os.path.join(
      cert_dir,
      '{}_{}_{}_{}-cert.pub'.format(project, service, version, instance),
  )


def CertFileFromComputeInstance(project_id, zone, instance_id):
  cert_dir = os.path.realpath(files.ExpandHomeDir(CERTIFICATE_DIR))
  return os.path.join(
      cert_dir, '{}_{}_{}-cert.pub'.format(project_id, zone, instance_id)
  )


def WriteCloudRunCertificate(cert, project, region, deployment):
  """Writes a certificate associated with the key pair for Cloud Run.

  Args:
    cert: string, The SSH certificate data.
    project: string, The project ID of the instance.
    region: string, The region of the deployment.
    deployment: string, The deployment name.
  """
  cert_dir = os.path.realpath(files.ExpandHomeDir(CERTIFICATE_DIR))
  files.MakeDir(cert_dir, mode=0o700)

  filepath = CertFileFromCloudRunDeployment(project, region, deployment)
  try:
    files.WriteFileContents(filepath, cert)
  except files.Error as e:
    log.debug('Failed to update the certificate {}: [{}]'.format(filepath, e))


def WriteAppEngineCertificate(cert, project, service, version, instance):
  """Writes a certificate associated with the key pair for App Engine.

  Args:
    cert: string, The SSH certificate data.
    project: string, The project ID of the instance.
    service: string, The service of the instance.
    version: string, The version of the instance.
    instance: string, The instance ID.
  """
  cert_dir = os.path.realpath(files.ExpandHomeDir(CERTIFICATE_DIR))
  files.MakeDir(cert_dir, mode=0o700)

  filepath = CertFileFromAppEngineInstance(project, service, version, instance)
  try:
    files.WriteFileContents(filepath, cert)
  except files.Error as e:
    log.debug('Failed to update the certificate {}: [{}]'.format(filepath, e))


def WriteComputeCertificate(cert, project_id, zone, instance_id):
  """Writes a certificate associated with the key pair.

  Args:
    cert: string, The SSH certificate data.
    project_id: string, The project ID of the instance.
    zone: string, The zone of the instance.
    instance_id: string, The instance ID.
  """
  cert_dir = os.path.realpath(files.ExpandHomeDir(CERTIFICATE_DIR))
  files.MakeDir(cert_dir, mode=0o700)

  filepath = CertFileFromComputeInstance(project_id, zone, instance_id)
  try:
    files.WriteFileContents(filepath, cert)
  except files.Error as e:
    log.debug('Failed to update the certificate {}: [{}]'.format(filepath, e))


def DeleteCertificateFile(project_id, zone, instance_id):
  """Deletes an OS Login certificate file.

  Args:
    project_id: string, The project ID of the instance.
    zone: string, The zone of the instance.
    instance_id: string, The instance ID.
  """
  filepath = CertFileFromComputeInstance(project_id, zone, instance_id)
  if os.path.exists(filepath):
    os.remove(filepath)


def ValidateCertificate(
    oslogin_state,
    project_id=None,
    zone=None,
    instance_id=None,
    app_engine_params=None,
    cloud_run_params=None,
):
  """Checks if the certificate is currently valid for a given instance.

  Args:
    oslogin_state: An OsloginState object.
    project_id: string, The project ID of the instance.
    zone: string, The zone of the instance.
    instance_id: string, The instance ID.
    app_engine_params: dict, values of (appsId, servicesId, versionId,
      instanceId, serviceAccount, region) for App Engine instances.
    cloud_run_params: dict, values of (region, deployment_name, project_id) for
      Cloud Run deployments.
  """
  def IsCertValid(cert):
    time_format = '%Y-%m-%dT%H:%M:%S'
    match = re.findall(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}', cert)
    if not match:
      return
    start = datetime.datetime.strptime(match[0], time_format)
    end = datetime.datetime.strptime(match[1], time_format)
    now = datetime.datetime.now()
    oslogin_state.signed_ssh_key = now > start and now < end

  if app_engine_params:
    cert_file = CertFileFromAppEngineInstance(
        app_engine_params['appsId'],
        app_engine_params['servicesId'],
        app_engine_params['versionsId'],
        app_engine_params['instancesId'],
    )
  elif cloud_run_params:
    cert_file = CertFileFromCloudRunDeployment(
        project_id,
        cloud_run_params['region'],
        cloud_run_params['deployment_name'],
    )
  else:
    cert_file = CertFileFromComputeInstance(project_id, zone, instance_id)
  cmd = KeygenCommand(cert_file, print_cert=True)
  try:
    if os.path.exists(cert_file):
      cmd.Run(out_func=IsCertValid)
  except CommandError as e:
    log.debug('Cert File [{0}] could not be opened: {1}'.format(cert_file, e))


def WriteSecurityKeys(oslogin_state):
  """Writes temporary files from a list of key data.

  Args:
    oslogin_state: An OsloginState object.

  Returns:
    List of file paths for security keys or None if security keys are not
    supported.
  """
  # PuTTY does not support security keys, so don't attempt to write any keys.
  if oslogin_state.environment == 'putty':
    return None

  security_keys = oslogin_state.security_keys

  # If there are no security keys, just return.
  if not security_keys:
    return None

  key_dir = os.path.realpath(files.ExpandHomeDir(SECURITY_KEY_DIR))

  files.MakeDir(key_dir, mode=0o700)
  key_files = []

  # Remove existing keys
  for filename in os.listdir(key_dir):
    if filename.startswith('tmp_sk'):
      file_path = os.path.join(key_dir, filename)
      os.remove(file_path)

  # Write new keys
  for num, key in enumerate(security_keys):
    filename = 'tmp_sk_{0}'.format(num)
    file_path = os.path.join(key_dir, filename)
    files.WriteFileContents(file_path, key, private=True)
    key_files.append(file_path)

  return key_files


class KnownHosts(object):
  """Represents known hosts file, supports read, write and basic key management.

  Currently a very naive, but sufficient, implementation where each entry is
  simply a string, and all entries are list of those strings.
  """

  # TODO(b/33467618): Rename the file itself
  DEFAULT_PATH = os.path.realpath(
      files.ExpandHomeDir(
          os.path.join('~', '.ssh', 'google_compute_known_hosts')
      )
  )

  def __init__(self, known_hosts, file_path):
    """Construct a known hosts representation based on a list of key strings.

    Args:
      known_hosts: str, list each corresponding to a line in known_hosts_file.
      file_path: str, path to the known_hosts_file.
    """
    self.known_hosts = known_hosts
    self.file_path = file_path

  @classmethod
  def FromFile(cls, file_path):
    """Create a KnownHosts object given a known_hosts_file.

    Args:
      file_path: str, path to the known_hosts_file.

    Returns:
      KnownHosts object corresponding to the file. If the file could not be
      opened, the KnownHosts object will have no entries.
    """
    try:
      known_hosts = files.ReadFileContents(file_path).splitlines()
    except files.Error as e:
      known_hosts = []
      log.debug(
          'SSH Known Hosts File [{0}] could not be opened: {1}'.format(
              file_path, e
          )
      )
    return KnownHosts(known_hosts, file_path)

  @classmethod
  def FromDefaultFile(cls):
    """Create a KnownHosts object from the default known_hosts_file.

    Returns:
      KnownHosts object corresponding to the default known_hosts_file.
    """
    return KnownHosts.FromFile(KnownHosts.DEFAULT_PATH)

  def ContainsAlias(self, host_key_alias):
    """Check if a host key alias exists in one of the known hosts.

    Args:
      host_key_alias: str, the host key alias

    Returns:
      bool, True if host_key_alias is in the known hosts file. If the known
      hosts file couldn't be opened it will be treated as if empty and False
      returned.
    """
    return any(host_key_alias in line for line in self.known_hosts)

  def Add(self, hostname, host_key, overwrite=False):
    """Add or update the entry for the given hostname.

    If there is no entry for the given hostname, it will be added. If there is
    an entry already and overwrite_keys is False, nothing will be changed. If
    there is an entry and overwrite_keys is True, the key will be updated if it
    has changed.

    Args:
      hostname: str, The hostname for the known_hosts entry.
      host_key: str, The host key for the given hostname.
      overwrite: bool, If true, will overwrite the entry corresponding to
        hostname with the new host_key if it already exists. If false and an
        entry already exists for hostname, will ignore the new host_key value.
    """
    new_key_entry = '{0} {1}'.format(hostname, host_key)
    for i, key in enumerate(self.known_hosts):
      if key.startswith(hostname):
        if overwrite:
          self.known_hosts[i] = new_key_entry
        break
    else:
      self.known_hosts.append(new_key_entry)

  def AddMultiple(self, hostname, host_keys, overwrite=False):
    """Add or update multiple entries for the given hostname.

    If there is no entry for the given hostname, the keys will be added. If
    there is an entry already, and overwrite keys is False, nothing will be
    changed. If there is an entry and overwrite_keys is True, all  current
    entries for the given hostname will be removed and the new keys added.

    Args:
      hostname: str, The hostname for the known_hosts entry.
      host_keys: list, A list of host keys for the given hostname.
      overwrite: bool, If true, will overwrite the entries corresponding to
        hostname with the new host_key if it already exists. If false and an
        entry already exists for hostname, will ignore the new host_key values.

    Returns:
      bool, True if new keys were added.
    """
    new_keys_added = False
    new_key_entries = [
        '{0} {1}'.format(hostname, host_key) for host_key in host_keys
    ]
    if not new_key_entries:
      return new_keys_added
    existing_entries = [
        key for key in self.known_hosts if key.startswith(hostname)
    ]
    if existing_entries:
      if overwrite:
        self.known_hosts = [
            key for key in self.known_hosts if not key.startswith(hostname)
        ]
        self.known_hosts.extend(new_key_entries)
        new_keys_added = True
    else:
      self.known_hosts.extend(new_key_entries)
      new_keys_added = True

    return new_keys_added

  def Write(self):
    """Writes the file to disk."""
    files.WriteFileContents(
        self.file_path, '\n'.join(self.known_hosts) + '\n', private=True
    )


def GetDefaultSshUsername(warn_on_account_user=False):
  """Returns the default username for ssh.

  The default username is the local username, unless that username is invalid.
  In that case, the default username is the username portion of the current
  account.

  Emits a warning if it's not using the local account username.

  Args:
    warn_on_account_user: bool, whether to warn if using the current account
      instead of the local username.

  Returns:
    str, the default SSH username.
  """
  user = getpass.getuser()
  if not _IsValidSshUsername(user):
    full_account = properties.VALUES.core.account.Get(required=True)
    account_user = gaia.MapGaiaEmailToDefaultAccountName(full_account)
    if warn_on_account_user:
      log.warning(
          'Invalid characters in local username [{0}]. '
          'Using username corresponding to active account: [{1}]'.format(
              user, account_user
          )
      )
    user = account_user
  return user


def MetadataHasEnable(metadata, key_name):
  """Return true if the metadata has the supplied key and it is set to 'true'.

  Args:
    metadata: Instance or Project metadata.
    key_name: The name of the metadata key to check. e.g. 'oslogin-enable'.

  Returns:
    True if Enabled, False if Disabled, None if key is not presesnt.
  """
  if not (metadata and metadata.items):
    return None
  matching_values = [
      item.value for item in metadata.items if item.key == key_name
  ]
  if not matching_values:
    return None
  return matching_values[0].lower() == 'true'


def FeatureEnabledInMetadata(
    instance, project, key_name, instance_override=None
):
  """Return True if the feature associated with the supplied key is enabled.

  If the key is set to 'true' in instance metadata, will return True.
  If the key is set to 'false' in instance metadata, will return False.
  If key is not set in instance metadata, will return the value in project
  metadata unless instance_override is not None (set to True or False).

  Args:
    instance: The current instance object.
    project: The current project object.
    key_name: The name of metadata key to check. e.g. 'oslogin-enable'.
    instance_override: The value of the instance metadata key. Used if the
      instance object cannot be passed in. None if not set.

  Returns:
    bool, True if the feature associated with the supplied key is enabled
      in instance/project metadata.
  """
  feature_enabled = None
  if instance is not None:
    feature_enabled = MetadataHasEnable(instance.metadata, key_name)
  elif instance_override is not None:
    feature_enabled = instance_override
  if feature_enabled is None:
    project_metadata = project.commonInstanceMetadata
    feature_enabled = MetadataHasEnable(project_metadata, key_name)

  return feature_enabled


def CheckSshSecurityKeySupport():
  """Check the local SSH installation for security key support.

  Runs 'ssh -Q key' and looks for keys starting with 'sk-'.
  PuTTY on Windows will return False.

  Returns:
    True if SSH supports security keys, False if not, and None if support
    cannot be determined.
  """
  env = Environment.Current()

  # Security Keys are not currently supported in PuTTy.
  if env.suite == Suite.PUTTY:
    return False

  ssh_flags = ['-Q', 'key']

  cmd = SSHCommand(None, extra_flags=ssh_flags, tty=False)
  cmd_list = cmd.Build()
  log.debug(cmd_list)

  try:
    output = six.ensure_str(
        subprocess.check_output(cmd_list, stderr=subprocess.STDOUT)
    )
    log.debug(output)
  except subprocess.CalledProcessError:
    log.debug(
        'Cannot determine SSH supported key types using command: {0}'.format(
            ' '.join(cmd_list)
        )
    )
    return None

  keys_supported = output.splitlines()
  log.debug('Supported SSH key types: {0}'.format(keys_supported))

  for key in keys_supported:
    if key.startswith('sk-'):
      return True

  # Keys starting with 'sk-' not found.
  return False


class OsloginState(object):
  """Class for holding OS Login State.

  Attributes:
    oslogin_enabled: bool, True if OS Login is enabled on the instance.
    oslogin_2fa_enabled: bool, True if OS Login 2FA is enabled on the instance.
    security_keys_enabled: bool, True if Security Keys should be used for SSH
      authentication.
    user: str, The username that SSH should use for connecting.
    third_party_user: bool, True if the authenticated user is an external
      account user.
    ssh_security_key_support: bool, True if the SSH client supports security
      keys.
    environment: str, A hint about the current environment. ('ssh' or 'putty')
    security_keys: list, A list of 'private' keys associated with the security
      keys configured in the user's account.
    signed_ssh_key: bool, True if a valid signed ssh key exists.
    require_certificates: bool, True if passing a certificate is required.
  """

  def __init__(
      self,
      oslogin_enabled=False,
      oslogin_2fa_enabled=False,
      security_keys_enabled=False,
      user=None,
      third_party_user=False,
      ssh_security_key_support=None,
      environment=None,
      security_keys=None,
      signed_ssh_key=False,
      require_certificates=False,
  ):
    self.oslogin_enabled = oslogin_enabled
    self.oslogin_2fa_enabled = oslogin_2fa_enabled
    self.security_keys_enabled = security_keys_enabled
    self.user = user
    self.third_party_user = third_party_user
    self.ssh_security_key_support = ssh_security_key_support
    self.environment = environment
    if security_keys is None:
      self.security_keys = []
    else:
      self.security_keys = security_keys
    self.signed_ssh_key = signed_ssh_key
    self.require_certificates = require_certificates

  def __str__(self):
    return textwrap.dedent("""\
        OS Login Enabled: {0}
        2FA Enabled: {1}
        Security Keys Enabled: {2}
        Username: {3}
        Third Party User: {4}
        SSH Security Key Support: {5}
        Environment: {6}
        Security Keys:
        {7}
        Signed SSH Key: {8}
        Require Certificates: {9}
        """).format(
        self.oslogin_enabled,
        self.oslogin_2fa_enabled,
        self.security_keys_enabled,
        self.user,
        self.third_party_user,
        self.ssh_security_key_support,
        self.environment,
        '\n'.join(self.security_keys),
        self.signed_ssh_key,
        self.require_certificates,
    )

  def __repr__(self):
    return (
        'OsloginState(oslogin_enabled={0}, oslogin_2fa_enabled={1}, '
        'security_keys_enabled={2}, user={3}, third_party_user={4}'
        'ssh_security_key_support={5}, environment={6}, '
        'security_keys={7}, signed_ssh_key={8}, '
        'require_certificates={9})'.format(
            self.oslogin_enabled,
            self.oslogin_2fa_enabled,
            self.security_keys_enabled,
            self.user,
            self.third_party_user,
            self.ssh_security_key_support,
            self.environment,
            self.security_keys,
            self.signed_ssh_key,
            self.require_certificates,
        )
    )


def _IsInstanceWindows(instance, messages=None):
  if not instance or not instance.disks or not messages:
    return False

  boot_disk = next(iter(filter(lambda disk: disk.boot, instance.disks)), None)
  guest_os_features = boot_disk.guestOsFeatures if boot_disk else []
  features = [feature.type for feature in guest_os_features or []]
  return (
      features
      and messages.GuestOsFeature.TypeValueValuesEnum.WINDOWS in features
  )


def _SignAndWriteCloudRunCertificate(
    oslogin,
    public_key,
    cloud_run_params,
):
  """Signs and writes a certificate for the given Cloud Run service."""
  project_id = cloud_run_params['project_id']
  region = cloud_run_params['region']
  deployment_name = cloud_run_params['deployment_name']
  sign_response = oslogin.SignSshPublicKeyForInstance(
      public_key,
      project_id,
      region,
      cloud_run_deployment=(
          f'projects/{project_id}/locations/{region}/services/{deployment_name}'
      ),
  )
  WriteCloudRunCertificate(
      sign_response.signedSshPublicKey,
      project_id,
      region,
      deployment_name,
  )


def _SignAndWriteAppEngineCertificate(
    oslogin,
    public_key,
    project,
    region,
    app_engine_params,
):
  """Signs and writes a certificate for the given App Engine instance."""
  sign_response = oslogin.SignSshPublicKeyForInstance(
      public_key,
      project.name,
      region,
      service_account=app_engine_params['serviceAccount'],
      app_engine_instance=(
          f'apps/{app_engine_params["appsId"]}/services/{app_engine_params["servicesId"]}/versions/{app_engine_params["versionsId"]}/instances/{app_engine_params["instancesId"]}'
      ),
  )
  WriteAppEngineCertificate(
      sign_response.signedSshPublicKey,
      app_engine_params['appsId'],
      app_engine_params['servicesId'],
      app_engine_params['versionsId'],
      app_engine_params['instancesId'],
  )


def _SignAndWriteComputeCertificate(
    oslogin,
    public_key,
    project,
    region,
    zone,
    instance,
):
  """Signs and writes a certificate for the given Compute instance."""
  sign_response = oslogin.SignSshPublicKeyForInstance(
      public_key,
      project.name,
      region,
      service_account=instance.serviceAccounts[0].email
      if instance.serviceAccounts
      else '',
      compute_instance=(
          f'projects/{project.name}/zones/{zone}/instances/{instance.id}'
      ),
  )
  WriteComputeCertificate(
      sign_response.signedSshPublicKey, project.name, zone, instance.id
  )


def GetOsloginState(
    instance,
    project,
    requested_user,
    public_key,
    expiration_time,
    release_track,
    app_engine_params=None,
    cloud_run_params=None,
    username_requested=False,
    instance_enable_oslogin=None,
    instance_enable_2fa=None,
    instance_enable_security_keys=None,
    instance_require_certificates=None,
    messages=None,
):
  """Check instance/project metadata for oslogin and return updated username.

  Check to see if OS Login is enabled in metadata and if it is, return
  the OS Login user and a boolean value indicating if OS Login is being used.

  Args:
    instance: instance, The object representing the instance we are connecting
      to. If None, instance metadata will be ignored.
    project: project, The object representing the current project.
    requested_user: str, The default or requested username to connect as.
    public_key: str, The public key of the user connecting.
    expiration_time: int, Microseconds after epoch when the ssh key should
      expire. If None, an existing key will not be modified and a new key will
      not be set to expire.  If not None, an existing key may be modified with
      the new expiry.
    release_track: release_track, The object representing the release track.
    app_engine_params: dict, The fields required to identify an App Engine
      instance. This should be None for Compute instances and Cloud Run
      deployments. If present, this should contain fields for 'appsId',
      'servicesId', 'versionsId', 'instancesId', and 'serviceAccount'.
    cloud_run_params: dict, The fields required to identify a Cloud Run
      deployment. This should be None for Compute and App Engine instances. If
      present, this should contain fields for 'deployment_name', 'project_id',
      and 'region'.
    username_requested: bool, True if the user has passed a specific username in
      the args.
    instance_enable_oslogin: True if the instance's metadata indicates that OS
      Login is enabled, and False if not enabled. Used when the instance cannot
      be passed through the instance argument. None if not specified.
    instance_enable_2fa: True if the instance's metadata indicates that OS Login
      2FA is enabled, and False if not enabled. Used when the instance cannot be
      passed through the instance argument. None if not specified.
    instance_enable_security_keys: True if the instance's metadata indicates
      that OS Login security keys are enabled, and False if not enabled. Used
      when the instance cannot be passed through the instance argument. None if
      not specified.
    instance_require_certificates: True if the instance's metadata indicates
      that OS Login SSH certificates are to be used exclusively, False
      otherwise. An override to be used when the instance cannot be passed
      through the "instance" argument. None if not specified.
    messages: API messages class, The compute API messages.

  Returns:
    object, An object containing the OS Login state, with values indicating
      whether OS Login is enabled, Security Keys are enabled, the username to
      connect as and a list of security keys.
  """
  oslogin_state = OsloginState(user=requested_user)

  if cloud_run_params:
    oslogin_enabled = True
  else:
    oslogin_enabled = FeatureEnabledInMetadata(
        instance,
        project,
        OSLOGIN_ENABLE_METADATA_KEY,
        instance_override=instance_enable_oslogin,
    )

  if not oslogin_enabled:
    # OS Login is disabled by default.
    return oslogin_state

  if _IsInstanceWindows(instance, messages=messages):
    log.status.Print(
        'OS Login is not available on Windows VMs.\nUsing ssh metadata.'
    )
    return oslogin_state

  oslogin_state.oslogin_enabled = oslogin_enabled
  oslogin_state.third_party_user = IsThirdPartyUser()

  if cloud_run_params:
    oslogin_state.oslogin_2fa_enabled = False
    oslogin_state.security_keys_enabled = False
    oslogin_state.require_certificates = True
  else:
    oslogin_state.oslogin_2fa_enabled = FeatureEnabledInMetadata(
        instance,
        project,
        OSLOGIN_ENABLE_2FA_METADATA_KEY,
        instance_override=instance_enable_2fa,
    )
    oslogin_state.security_keys_enabled = FeatureEnabledInMetadata(
        instance,
        project,
        OSLOGIN_ENABLE_SK_METADATA_KEY,
        instance_override=instance_enable_security_keys,
    )
    oslogin_state.require_certificates = FeatureEnabledInMetadata(
        instance,
        project,
        OSLOGIN_ENABLE_CERTIFICATES_METADATA_KEY,
        instance_override=instance_require_certificates,
    )

  env = Environment.Current()
  if env.suite == Suite.PUTTY:
    oslogin_state.environment = 'putty'
  else:
    oslogin_state.environment = 'ssh'

  if oslogin_state.security_keys_enabled:
    oslogin_state.ssh_security_key_support = CheckSshSecurityKeySupport()
  oslogin = oslogin_client.OsloginClient(release_track)
  user_email = (
      properties.VALUES.auth.impersonate_service_account.Get()
      or properties.VALUES.core.account.Get()
  )

  zone = None
  region = None
  if instance and instance.zone:
    zone = instance.zone.split('/').pop()
    # Inclusively trim suffix from last '-' to convert a zone into a region.
    region = zone[: zone.rindex('-')]
  elif app_engine_params:
    region = app_engine_params['region']

  if release_track in [
      base.ReleaseTrack.ALPHA,
      base.ReleaseTrack.BETA,
  ] and (oslogin_state.third_party_user or oslogin_state.require_certificates):
    user_email = quote(user_email, safe=':@')
    if app_engine_params:
      ValidateCertificate(oslogin_state, app_engine_params)
    elif cloud_run_params:
      ValidateCertificate(
          oslogin_state,
          cloud_run_params=cloud_run_params,
          project_id=cloud_run_params['project_id'],
      )
    else:
      ValidateCertificate(oslogin_state, project.name, zone, instance.id)
    if not oslogin_state.signed_ssh_key:
      try:
        if app_engine_params:
          _SignAndWriteAppEngineCertificate(
              oslogin, public_key, project, region, app_engine_params
          )
        elif cloud_run_params:
          _SignAndWriteCloudRunCertificate(
              oslogin,
              public_key,
              cloud_run_params,
          )
        else:
          _SignAndWriteComputeCertificate(
              oslogin, public_key, project, region, zone, instance
          )
      except exceptions.HttpNotFoundError:
        log.status.Print(
            'No OS Login profile found for user [{0}] for project [{1}].'
            ' Creating POSIX account.'.format(user_email, project.name)
        )
        oslogin.ProvisionPosixAccount(user_email, project.name, region)
        if app_engine_params:
          _SignAndWriteAppEngineCertificate(
              oslogin, public_key, project, region, app_engine_params
          )
        elif cloud_run_params:
          _SignAndWriteCloudRunCertificate(
              oslogin, public_key, cloud_run_params
          )
        else:
          _SignAndWriteComputeCertificate(
              oslogin, public_key, project, region, zone, instance
          )
    login_profile = oslogin.GetLoginProfile(
        user_email,
        cloud_run_params['project_id'] if cloud_run_params else project.name,
        include_security_keys=oslogin_state.security_keys_enabled,
    )
  # Check to see if public key is already in profile and POSIX information
  # exists associated with the project. If either are not set, import an SSH
  # public key. Otherwise update the expiration time if needed.
  else:
    if oslogin_state.third_party_user:
      raise NotImplementedError(
          'SSH using federated workforce identities is not yet generally '
          'available (GA). Please use `gcloud beta compute ssh` to SSH using '
          'a third-party identity.'
      )
    if oslogin_state.require_certificates:
      raise NotImplementedError(
          'SSH using certificates is not yet generally available (GA). Please'
          ' use `gcloud beta compute ssh` to SSH to VMs that require'
          ' certificate authentication.'
      )
    login_profile = oslogin.GetLoginProfile(
        user_email,
        cloud_run_params['project_id'] if cloud_run_params else project.name,
        include_security_keys=oslogin_state.security_keys_enabled,
    )
    if oslogin_state.security_keys_enabled:
      oslogin_state.security_keys = oslogin_utils.GetSecurityKeysFromProfile(
          user_email, oslogin, profile=login_profile
      )
      if not login_profile.posixAccounts:
        import_response = oslogin.ImportSshPublicKey(user_email, '')
        login_profile = import_response.loginProfile
    else:
      keys = oslogin_utils.GetKeyDictionaryFromProfile(
          user_email, oslogin, profile=login_profile
      )
      fingerprint = oslogin_utils.FindKeyInKeyList(public_key, keys)
      if not fingerprint or not login_profile.posixAccounts:
        import_response = oslogin.ImportSshPublicKey(
            user_email,
            public_key,
            expiration_time=expiration_time,
            include_security_keys=False,
            region=region,
        )
        login_profile = import_response.loginProfile
      elif expiration_time:
        oslogin.UpdateSshPublicKey(
            user_email,
            fingerprint,
            keys[fingerprint],
            'expirationTimeUsec',
            expiration_time=expiration_time,
        )

  # Get the username for the oslogin user. If the username is the same as the
  # default user, return that one. Otherwise, return the 'primary' username.
  # If no 'primary' exists, return the first username.
  oslogin_user = None
  for pa in login_profile.posixAccounts:
    oslogin_user = oslogin_user or pa.username
    if pa.username == requested_user:
      return oslogin_state
    elif pa.primary:
      oslogin_user = pa.username

  oslogin_state.user = oslogin_user

  # If the user passed in a specific username to the command, show a message
  # to the user, otherwise just add a message to the log.
  if username_requested:
    log.status.Print(
        'Using OS Login user [{0}] instead of requested user [{1}]'.format(
            oslogin_user, requested_user
        )
    )
  else:
    log.info(
        'Using OS Login user [{0}] instead of default user [{1}]'.format(
            oslogin_user, requested_user
        )
    )
  return oslogin_state


def IsThirdPartyUser():
  """Checks if the Authenticated User is a BYOID User."""
  try:
    creds = c_store.LoadFreshCredential(use_google_auth=True)
  except creds_exceptions.Error:
    log.debug('Failed to load fresh credentials.')
    return False
  creds_type = c_creds.CredentialTypeGoogleAuth.FromCredentials(creds)
  return creds_type in (
      c_creds.CredentialTypeGoogleAuth.EXTERNAL_ACCOUNT,
      c_creds.CredentialTypeGoogleAuth.EXTERNAL_ACCOUNT_USER,
      c_creds.CredentialTypeGoogleAuth.EXTERNAL_ACCOUNT_AUTHORIZED_USER,
  )


def ParseAndSubstituteSSHFlags(
    args, remote, instance_address, internal_address
):
  """Obtain extra flags from the command arguments."""
  extra_flags = []
  if args.ssh_flag:
    for flag in args.ssh_flag:
      if flag and flag != '--':  # Skip any empty flags.
        for flag_part in flag.split():  # We want grouping here
          deref_flag = flag_part

          # Call replace only when it is necessary. It will lower the chance
          # replace crashes because the replacing value is None.

          if '%USER%' in deref_flag:
            deref_flag = deref_flag.replace('%USER%', remote.user)

          if '%INSTANCE%' in deref_flag:
            deref_flag = deref_flag.replace('%INSTANCE%', instance_address)

          if '%INTERNAL%' in deref_flag:
            deref_flag = deref_flag.replace('%INTERNAL%', internal_address)

          extra_flags.append(deref_flag)
  return extra_flags


def _SdkHelperBin():
  """Returns the SDK helper executable bin directory."""
  # TODO(b/33467618): Remove this method?
  return os.path.join(config.Paths().sdk_root, 'bin', 'sdk')


class Remote(object):
  """A reference to an SSH remote, consisting of a host and user.

  Hashing and equality methods are implemented for this class,
  so remotes can be put in sets for de-duplication.

  Attributes:
    user: str or None, SSH user name (optional).
    host: str or None, Host name.
  """

  # A remote has two parts `[user@]host`, where `user` is optional.
  #   A user:
  #   - cannot contain ':', '@'
  #   A host:
  #   - cannot start with '.'
  #   - cannot contain ':', '/', '\\', '@'
  # This regular expression matches if and only if the above requirements are
  # satisfied. The capture groups are (user, host) where `user` will be
  # None if omitted.
  _REMOTE_REGEX = re.compile(r'^(?:([^:@]+)@)?([^.:/\\@][^:/\\@]*)$')

  def __init__(self, host, user=None):
    """Constructor for FileReference.

    Args:
      host: str or None, Host name.
      user: str or None, SSH user name.
    """
    self.host = host
    self.user = user

  def ToArg(self):
    """Convert to a positional argument, in the form expected by `ssh`/`plink`.

    Returns:
      str, A string on the form `[user@]host`.
    """
    return self.user + '@' + self.host if self.user else self.host

  @classmethod
  def FromArg(cls, arg):
    """Convert an SSH-style positional argument to a remote.

    Args:
      arg: str, A path on the canonical ssh form `[user@]host`.

    Returns:
      Remote, the constructed object or None if arg is malformed.
    """
    match = cls._REMOTE_REGEX.match(arg)
    if match:
      user, host = match.groups()
      return cls(host, user=user)
    else:
      return None

  def __hash__(self):
    return hash(self.ToArg())

  def __eq__(self, other):
    return type(self) is type(other) and self.ToArg() == other.ToArg()

  def __ne__(self, other):
    return not self.__eq__(other)

  def __repr__(self):
    return self.ToArg()


def _EscapeProxyCommandArg(s, env):
  """Returns s escaped such that it can be a ProxyCommand arg.

  Args:
    s: str, Argument to escape. Must be non-empty.
    env: Environment, data about the ssh client.

  Raises:
    BadCharacterError: If s contains a bad character.
  """
  for c in s:
    if not 0x20 <= ord(c) < 0x7F:
      # For ease of implementation we ban control characters and non-ASCII.
      raise BadCharacterError(
          (
              "Special character %r (part of %r) couldn't be escaped for "
              'ProxyCommand'
          )
          % (c, s)
      )
  if env.suite is Suite.PUTTY:
    # When using proxycmd with putty or plink, 3 unescapes happen:
    # 1 putty/plink does command line -> argv unescape.
    # 2 putty/plink does backslash and percent unescape.
    # 3 Inner gcloud python binary does command line -> argv unescape.
    #
    # We reverse this, doing escapes in reverse order, doing 3, 2.
    # We don't do the 1 escape here because that's done later inside the
    # subprocess.Popen() function.
    s = _EscapeWindowsArgvElement(s)
    s = _EscapePuttyBackslashPercent(s)
    return s
  # When using ProxyCommand with OpenSSH, 2 unescapes happen:
  # 1 OpenSSH does percent unescape.
  # 2 bash does unescape.
  # We do the corresponding escapes in reverse.
  return _EscapeForBash(s).replace('%', '%%')


def _EscapeWindowsArgvElement(s):
  """Returns s escaped such that it can be passed to a windows executable.

  Args:
    s: str, What to escape. Must be ASCII and non-control.
  """
  # Each Windows binary can unescape its commandline arguments to argv how it
  # wants, but they tend to behave like this:
  # https://docs.microsoft.com/en-us/cpp/cpp/parsing-cpp-command-line-arguments?view=vs-2017
  # We escape in that format because that format is similar to how python (of
  # the inner gcloud) does it. The primary difference is what happens when
  # inside a doublequoted string there is an even number of backslashes
  # (possibly 0) then at least 2 doublequotes. This function never returns a
  # string like that, so it avoids the ambiguity.
  #
  # We escape in reverse, because that's easiest.
  result = []
  # Whether (in the reversed input) we are following a non-broken chain of
  # backslashes (possibly 0-length) after a doublequote.
  # The final output will have a doublequote appended to the end, so the first
  # character of the reversed input is considered to follow a doublequote.
  following_quote = True
  for c in s[::-1]:
    if c == '"':
      result.append('"\\')
      following_quote = True
    elif c == '\\':
      if following_quote:
        result.append('\\\\')
      else:
        result.append('\\')
    else:
      result.append(c)
      following_quote = False
  return '"' + ''.join(result)[::-1] + '"'


def _EscapePuttyBackslashPercent(s):
  # s must be ASCII and non-control.
  # The putty unescaping is documented at
  # https://the.earth.li/~sgtatham/putty/0.70/htmldoc/Chapter4.html#config-proxy-command
  return s.replace('\\', '\\\\').replace('%', '%%')


def _EscapeForBash(s):
  """Returns s escaped so it can be used as a single bash argument.

  Args:
    s: str, What to escape. Must be ASCII, non-control, and non-empty.
  """
  # From https://stackoverflow.com/q/15783701
  good_chars = set(string.ascii_letters + string.digits + '%+-./:=@_')
  result = []
  for c in s:
    if c in good_chars:
      result.append(c)
    else:
      result.append('\\' + c)
  return ''.join(result)


def _BuildIapTunnelProxyCommandArgs(iap_tunnel_args, env):
  """Calculate the ProxyCommand flags for IAP Tunnel if necessary.

  IAP Tunnel with ssh runs an second inner version of gcloud by passing a
  command to do so as a ProxyCommand argument to OpenSSH/Putty.

  Args:
    iap_tunnel_args: iap_tunnel.SshTunnelArgs or None, options about IAP Tunnel.
    env: Environment, data about the ssh client.

  Raises:
    BadCharacterError: If instance arg contains any invalid characters.

  Returns:
    [str], the additional arguments for OpenSSH or Putty.
  """
  if not iap_tunnel_args:
    return []

  # A relaxed validation of the passed in instance name / IP / hostname.
  # Mostly so potentially malicious names don't get passed in the resulting
  # generated command.
  allowed_non_alnum_chars = {'-', '_', '.'}
  for char in iap_tunnel_args.instance:
    if not char.isalnum() and char not in allowed_non_alnum_chars:
      raise BadCharacterError(
          'Instance name/IP/hostname contains illegal characters.'
      )

  gcloud_command = execution_utils.ArgsForGcloud()
  # Applying _EscapeProxyCommandArg to the first item (the python executable
  # path) doesn't make 100% sense on Windows, because the full unescaping only
  # happens to arguments, not to the executable path. But this escaping will be
  # correct as long as the python executable path doesn't contain a doublequote
  # or end with a backslash, which should never happen.
  gcloud_command = [_EscapeProxyCommandArg(x, env) for x in gcloud_command]
  # track, project, zone, instance, verbosity should only contain
  # characters that don't need escaping, so don't bother escaping them.
  if iap_tunnel_args.track:
    gcloud_command.append(iap_tunnel_args.track)
  # Windows CMD only accepts double quotes.
  port_token, quotation = (
      ('%port', '\"') if env.suite is Suite.PUTTY else ('%p', '\'')
  )
  gcloud_command.extend([
      'compute',
      'start-iap-tunnel',
      quotation + iap_tunnel_args.instance + quotation,
      quotation + port_token + quotation,
      '--listen-on-stdin',
      '--project=' + iap_tunnel_args.project,
  ])
  if iap_tunnel_args.zone:
    gcloud_command.append('--zone=' + iap_tunnel_args.zone)
  if iap_tunnel_args.region:
    gcloud_command.append('--region=' + iap_tunnel_args.region)
  if iap_tunnel_args.network:
    gcloud_command.append('--network=' + iap_tunnel_args.network)
  for arg in iap_tunnel_args.pass_through_args:
    gcloud_command.append(_EscapeProxyCommandArg(arg, env))

  verbosity = log.GetVerbosityName()
  if verbosity:
    gcloud_command.append('--verbosity=' + verbosity)

  if env.suite is Suite.PUTTY:
    return ['-proxycmd', ' '.join(gcloud_command)]
  else:
    return [
        '-o',
        ' '.join(['ProxyCommand'] + gcloud_command),
        '-o',
        'ProxyUseFdpass=no',
    ]


class KeygenCommand(object):
  """Platform independent SSH client key generation command.

  For OpenSSH, we use `ssh-keygen(1)`. For PuTTY, we use a custom binary.
  Currently, the only supported algorithm is 'rsa'. The command generates the
  following files:
  - `<identity_file>`: Private key, on OpenSSH format (possibly encrypted).
  - `<identity_file>.pub`: Public key, on OpenSSH format.
  - `<identity_file>.ppk`: Unencrypted PPK key-pair, on PuTTY format.

  The PPK-file is only generated from a PuTTY environment, and encodes the same
  private- and public keys as the other files.

  Attributes:
    identity_file: str, path to private key file.
    allow_passphrase: bool, If True, attempt at prompting the user for a
      passphrase for private key encryption, given that the following conditions
      are also true: - Running in an OpenSSH environment (Linux and Mac) -
      Running in interactive mode (from an actual TTY) - Prompts are enabled in
      gcloud
    reencode_ppk: bool, If True, reencode the PPK file if it was generated with
      a bad encoding, instead of generating a new key. This is only valid for
      PuTTY.
    print_cert: bool, If True, ssh-keygen will print certificate information.
  """

  def __init__(self, identity_file, allow_passphrase=True, reencode_ppk=False,
               print_cert=False):
    """Construct a suite independent `ssh-keygen` command."""
    self.identity_file = identity_file
    self.allow_passphrase = allow_passphrase
    self.reencode_ppk = reencode_ppk
    self.print_cert = print_cert

  def Build(self, env=None):
    """Construct the actual command according to the given environment.

    Args:
      env: Environment, to construct the command for (or current if None).

    Raises:
      MissingCommandError: If keygen command was not found.

    Returns:
      [str], the command args (where the first arg is the command itself).
    """
    env = env or Environment.Current()
    if not env.keygen:
      raise MissingCommandError(
          'Keygen command not found in the current environment.'
      )
    args = [env.keygen]
    if env.suite is Suite.OPENSSH:
      if self.print_cert:
        args.extend(['-L', '-f', self.identity_file])
      else:
        prompt_passphrase = self.allow_passphrase and console_io.CanPrompt()
        if not prompt_passphrase:
          args.extend(['-N', ''])  # Empty passphrase
        args.extend(['-t', 'rsa', '-f', self.identity_file])
    else:
      if self.reencode_ppk:
        args.append('--reencode-ppk')
      args.append(self.identity_file)

    return args

  def Run(self, env=None, out_func=None):
    """Run the keygen command in the given environment.

    Args:
      env: Environment, environment to run in (or current if None).
      out_func: str->None: A function call with the stdout of ssh-keygen.

    Raises:
      MissingCommandError: Keygen command not found.
      CommandError: Keygen command failed.
    """
    args = self.Build(env)
    log.debug('Running command [{}].'.format(' '.join(args)))
    status = execution_utils.Exec(args, no_exit=True, out_func=out_func)
    if status:
      raise CommandError(args[0], return_code=status)


class SSHCommand(object):
  """Represents a platform independent SSH command.

  This class is intended to manage the most important suite- and platform
  specifics. We manage the following data:
  - The executable to call, either `ssh`, `putty` or `plink`.
  - User and host, through the `remote` arg.
  - Potential remote command to execute, `remote_command` arg.

  In addition, it manages these flags:
  -t, -T      Pseudo-terminal allocation
  -p, -P      Port
  -i          Identity file (private key)
  -o Key=Val  OpenSSH specific options that should be added, `options` arg.

  For flexibility, SSHCommand also accepts `extra_flags`. Always use these
  with caution -- they will be added as-is to the command invocation without
  validation. Specifically, do not add any of the above mentioned flags.
  """

  def __init__(
      self,
      remote,
      port=None,
      identity_file=None,
      cert_file=None,
      options=None,
      extra_flags=None,
      remote_command=None,
      tty=None,
      iap_tunnel_args=None,
      remainder=None,
      identity_list=None,
  ):
    """Construct a suite independent SSH command.

    Note that `extra_flags` and `remote_command` arguments are lists of strings:
    `remote_command=['echo', '-e', 'hello']` is different from
    `remote_command=['echo', '-e hello']` -- the former is likely desired.
    For the same reason, `extra_flags` should be passed like `['-k', 'v']`.

    Args:
      remote: Remote, the remote to connect to.
      port: str, port.
      identity_file: str, path to private key file.
      cert_file: str, path to certificate file.
      options: {str: str}, options (`-o`) for OpenSSH, see `ssh_config(5)`.
      extra_flags: [str], extra flags to append to ssh invocation. Both binary
        style flags `['-b']` and flags with values `['-k', 'v']` are accepted.
      remote_command: [str], command to run remotely.
      tty: bool, launch a terminal. If None, determine automatically based on
        presence of remote command.
      iap_tunnel_args: iap_tunnel.SshTunnelArgs or None, options about IAP
        Tunnel.
      remainder: [str], NOT RECOMMENDED. Arguments to be appended directly to
        the native tool invocation, after the `[user@]host` part but prior to
        the remote command. On PuTTY, this can only be a remote command. On
        OpenSSH, this can be flags followed by a remote command. Cannot be
        combined with `remote_command`. Use `extra_flags` and `remote_command`
        instead.
      identity_list: list, A list of paths to private key files. Overrides the
        identity_file argument, and sets multiple `['-i']` flags.
    """
    self.remote = remote
    self.port = port
    self.identity_file = identity_file
    self.cert_file = cert_file
    self.identity_list = identity_list
    self.options = options or {}
    self.extra_flags = extra_flags or []
    self.remote_command = remote_command or []
    self.tty = tty
    self.iap_tunnel_args = iap_tunnel_args
    self.remainder = remainder
    self._remote_command_file = None

  def Build(self, env=None):
    """Construct the actual command according to the given environment.

    Args:
      env: Environment, to construct the command for (or current if None).

    Raises:
      MissingCommandError: If SSH command(s) required were not found.

    Returns:
      [str], the command args (where the first arg is the command itself).
    """
    env = env or Environment.Current()
    if not (env.ssh and env.ssh_term):
      raise MissingCommandError('The current environment lacks SSH.')

    tty = self.tty if self.tty in [True, False] else not self.remote_command
    args = [env.ssh_term, '-t'] if tty else [env.ssh, '-T']

    if self.port:
      port_flag = '-P' if env.suite is Suite.PUTTY else '-p'
      args.extend([port_flag, self.port])

    if self.cert_file:
      args.extend(['-o CertificateFile={}'.format(self.cert_file)])

    if self.identity_list:
      for identity_file in self.identity_list:
        args.extend(['-i', identity_file])
    elif self.identity_file:
      identity_file = self.identity_file
      if env.suite is Suite.PUTTY and not identity_file.endswith('.ppk'):
        identity_file += '.ppk'
      args.extend(['-i', identity_file])

    if env.suite is Suite.OPENSSH:
      # Always, always deterministic order
      for key, value in sorted(six.iteritems(self.options)):
        args.extend(['-o', '{k}={v}'.format(k=key, v=value)])

    args.extend(_BuildIapTunnelProxyCommandArgs(self.iap_tunnel_args, env))
    args.extend(self.extra_flags)
    if self.remote:
      args.append(self.remote.ToArg())

    # TODO(b/38179637): Remove when compute have separated flags from
    # positionals.
    if self.remainder:
      args.extend(self.remainder)

    if self.remote_command:
      if env.suite is Suite.OPENSSH:
        args.append('--')
        args.extend(self.remote_command)
      elif tty:
        # Unlike plink.exe, putty.exe doesn't support passing a remote command
        # with spaces on the command line; however the -m flag lets one pass in
        # a local file from which to read the remote command.
        with tempfile.NamedTemporaryFile(
            mode='w+t', delete=False
        ) as self._remote_command_file:
          self._remote_command_file.write(' '.join(self.remote_command))
        args.extend(['-m', self._remote_command_file.name])
      else:
        args.extend(self.remote_command)
    return args

  def Run(
      self,
      env=None,
      putty_force_connect=False,
      explicit_output_file=None,
      explicit_error_file=None,
      explicit_input_file=None,
  ):
    """Run the SSH command using the given environment.

    Args:
      env: Environment, environment to run in (or current if None).
      putty_force_connect: bool, whether to inject 'y' into the prompts for
        `plink`, which is insecure and not recommended. It serves legacy
        compatibility purposes for existing usages only; DO NOT SET THIS IN NEW
        CODE.
      explicit_output_file: Pipe stdout into this file-like object
      explicit_error_file: Pipe stderr into this file-like object
      explicit_input_file: Pipe stdin from this file-like object

    Raises:
      MissingCommandError: If SSH command(s) not found.
      CommandError: SSH command failed (not to be confused with the eventual
        failure of the remote command).

    Returns:
      int, The exit code of the remote command, forwarded from the client.
    """
    env = env or Environment.Current()
    args = self.Build(env)
    log.debug('Running command [{}].'.format(' '.join(args)))
    # PuTTY and friends always ask on fingerprint mismatch
    if env.suite is Suite.PUTTY and putty_force_connect:
      in_str = 'y\n'
    else:
      in_str = None

    # We pipe stdout to a specific file
    extra_popen_kwargs = {}
    if explicit_output_file:
      extra_popen_kwargs['stdout'] = explicit_output_file
    if explicit_error_file:
      extra_popen_kwargs['stderr'] = explicit_error_file
    if explicit_input_file:
      extra_popen_kwargs['stdin'] = explicit_input_file

    status = execution_utils.Exec(
        args, no_exit=True, in_str=in_str, **extra_popen_kwargs
    )

    # This should only happen if we're using putty.exe with a remote command. In
    # that case the file created earlier via NamedTemporaryFile has to be
    # deleted manually due to https://bugs.python.org/issue14243 on Windows.
    if self._remote_command_file:
      try:
        os.remove(self._remote_command_file.name)
      except OSError:
        # Not worth crashing over.
        log.debug(
            'Failed to delete remote command file [{}]'.format(
                self._remote_command_file.name
            )
        )
        pass

    if status == env.ssh_exit_code:
      raise CommandError(args[0], return_code=status)
    return status


class SCPCommand(object):
  """Represents a platform independent SCP command.

  This class is intended to manage the most important suite- and platform
  specifics. We manage the following data:
  - The executable to call, either `scp` or `pscp`.
  - User and host, through either `sources` or `destination` arg. Multiple
    remote sources are allowed but not supported under PuTTY. Multiple local
    sources are always allowed.
  - Potential remote command to execute, `remote_command` arg.

  In addition, it manages these flags:
  -r          Recursive copy
  -C          Compression
  -P          Port
  -i          Identity file (private key)
  -o Key=Val  OpenSSH specific options that should be added, `options` arg.

  For flexibility, SCPCommand also accepts `extra_flags`. Always use these
  with caution -- they will be added as-is to the command invocation without
  validation. Specifically, do not add any of the above mentioned flags.
  """

  def __init__(
      self,
      sources,
      destination,
      recursive=False,
      compress=False,
      port=None,
      identity_file=None,
      cert_file=None,
      options=None,
      extra_flags=None,
      iap_tunnel_args=None,
      identity_list=None,
  ):
    """Construct a suite independent SCP command.

    Args:
      sources: [FileReference] or FileReference, the source(s) for this copy. At
        least one source is required. NOTE: Multiple remote sources are not
        supported in PuTTY and is discouraged for consistency.
      destination: FileReference, the destination file or directory. If remote
        source, this must be local, and vice versa.
      recursive: bool, recursive directory copy.
      compress: bool, enable compression.
      port: str, port.
      identity_file: str, path to private key file.
      cert_file: str, path to OpenSSH certificate file.
      options: {str: str}, options (`-o`) for OpenSSH, see `ssh_config(5)`.
      extra_flags: [str], extra flags to append to scp invocation. Both binary
        style flags `['-b']` and flags with values `['-k', 'v']` are accepted.
      iap_tunnel_args: iap_tunnel.SshTunnelArgs or None, options about IAP
        Tunnel.
      identity_list: list, A list of paths to private key files. Overrides the
        identity_file argument, and sets multiple `['-i']` flags.
    """
    self.sources = [sources] if isinstance(sources, FileReference) else sources
    self.destination = destination
    self.recursive = recursive
    self.compress = compress
    self.port = port
    self.identity_file = identity_file
    self.cert_file = cert_file
    self.identity_list = identity_list
    self.options = options or {}
    self.extra_flags = extra_flags or []
    self.iap_tunnel_args = iap_tunnel_args

  @classmethod
  def Verify(cls, sources, destination, single_remote=False, env=None):
    """Verify that the source- and destination config is sound.

    Checks that sources are remote if destination is local and vice versa,
    plus raises error for multiple remote sources in PuTTY, which is not
    supported by `pscp`.

    Args:
      sources: [FileReference], see SCPCommand.sources.
      destination: FileReference, see SCPCommand.destination.
      single_remote: bool, if True, enforce that all remote sources refer to the
        same Remote (user and host).
      env: Environment, the current environment.

    Raises:
      InvalidConfigurationError: The source/destination configuration is
        invalid.
    """
    env = env or Environment.Current()

    if not sources:
      raise InvalidConfigurationError(
          'No sources provided.', sources, destination
      )

    if destination.remote:  # local -> remote
      if any([src.remote for src in sources]):
        raise InvalidConfigurationError(
            'All sources must be local files when destination is remote.',
            sources,
            destination,
        )
    else:  # remote -> local
      if env.suite is Suite.PUTTY and len(sources) != 1:
        raise InvalidConfigurationError(
            'Multiple remote sources not supported by PuTTY.',
            sources,
            destination,
        )
      if not all([src.remote for src in sources]):
        raise InvalidConfigurationError(
            'Source(s) must be remote when destination is local.',
            sources,
            destination,
        )
      if single_remote and len(set([src.remote for src in sources])) != 1:
        raise InvalidConfigurationError(
            'All sources must refer to the same remote when destination is '
            'local.',
            sources,
            destination,
        )

  def Build(self, env=None):
    """Construct the actual command according to the given environment.

    Args:
      env: Environment, to construct the command for (or current if None).

    Raises:
      InvalidConfigurationError: The source/destination configuration is
        invalid.
      MissingCommandError: If SCP command(s) required were not found.

    Returns:
      [str], the command args (where the first arg is the command itself).
    """
    env = env or Environment.Current()
    if not env.scp:
      raise MissingCommandError(
          'The current environment lacks an SCP (secure copy) client.'
      )
    self.Verify(self.sources, self.destination, env=env)

    args = [env.scp]

    if self.recursive:
      args.append('-r')

    if self.compress:
      args.append('-C')

    if self.port:
      args.extend(['-P', self.port])

    if self.cert_file and env.suite is Suite.OPENSSH:
      self.options['CertificateFile'] = self.cert_file

    if self.identity_list:
      for identity_file in self.identity_list:
        args.extend(['-i', identity_file])
    elif self.identity_file:
      identity_file = self.identity_file
      if env.suite is Suite.PUTTY and not identity_file.endswith('.ppk'):
        identity_file += '.ppk'
      args.extend(['-i', identity_file])

    # SSH config options
    if env.suite is Suite.OPENSSH:
      # Always, always deterministic order
      for key, value in sorted(six.iteritems(self.options)):
        args.extend(['-o', '{k}={v}'.format(k=key, v=value)])

    args.extend(_BuildIapTunnelProxyCommandArgs(self.iap_tunnel_args, env))
    args.extend(self.extra_flags)

    # Positionals
    args.extend([source.ToArg() for source in self.sources])
    args.append(self.destination.ToArg())
    return args

  def Run(self, env=None, putty_force_connect=False):
    """Run the SCP command using the given environment.

    Args:
      env: Environment, environment to run in (or current if None).
      putty_force_connect: bool, whether to inject 'y' into the prompts for
        `pscp`, which is insecure and not recommended. It serves legacy
        compatibility purposes for existing usages only; DO NOT SET THIS IN NEW
        CODE.

    Raises:
      InvalidConfigurationError: The source/destination configuration is
        invalid.
      MissingCommandError: If SCP command(s) not found.
      CommandError: SCP command failed to copy the file(s).
    """
    env = env or Environment.Current()
    args = self.Build(env)
    log.debug('Running command [{}].'.format(' '.join(args)))
    # pscp asks on (1) first connection and (2) fingerprint mismatch.
    # This ensures pscp will always allow the connection.
    # TODO(b/35355795): Work out a better solution for PuTTY.
    if env.suite is Suite.PUTTY and putty_force_connect:
      in_str = 'y\n'
    else:
      in_str = None
    status = execution_utils.Exec(args, no_exit=True, in_str=in_str)
    if status:
      raise CommandError(args[0], return_code=status)


class SSHPoller(object):
  """Represents an SSH command that polls for connectivity.

  Using a poller is not ideal, because each attempt is a separate connection
  attempt, meaning that the user might be prompted for a passphrase or to
  approve a server identity by the underlying ssh tool that we do not control.
  Always assume that polling for connectivity using this method is an operation
  that requires user action.
  """

  def __init__(
      self,
      remote,
      port=None,
      identity_file=None,
      options=None,
      extra_flags=None,
      max_wait_ms=60 * 1000,
      sleep_ms=5 * 1000,
      iap_tunnel_args=None,
  ):
    """Construct a poller for an SSH connection.

    Args:
      remote: Remote, the remote to poll.
      port: str, port to poll.
      identity_file: str, path to private key file.
      options: {str: str}, options (`-o`) for OpenSSH, see `ssh_config(5)`.
      extra_flags: [str], extra flags to append to ssh invocation. Both binary
        style flags `['-b']` and flags with values `['-k', 'v']` are accepted.
      max_wait_ms: int, number of ms to wait before raising.
      sleep_ms: int, time between trials.
      iap_tunnel_args: iap_tunnel.SshTunnelArgs or None, information about IAP
        Tunnel.
    """
    self.ssh_command = SSHCommand(
        remote,
        port=port,
        identity_file=identity_file,
        options=options,
        extra_flags=extra_flags,
        remote_command=[':'],
        tty=False,
        iap_tunnel_args=iap_tunnel_args,
    )
    self._sleep_ms = sleep_ms
    self._retryer = retry.Retryer(max_wait_ms=max_wait_ms, jitter_ms=0)

  def Poll(self, env=None, putty_force_connect=False):
    """Poll a remote for connectivity within the given timeout.

    The SSH command may prompt the user. It is recommended to wrap this call in
    a progress tracker. If this method returns, a connection was successfully
    established. If not, this method will raise.

    Args:
      env: Environment, environment to run in (or current if None).
      putty_force_connect: bool, whether to inject 'y' into the prompts for
        `plink`, which is insecure and not recommended. It serves legacy
        compatibility purposes for existing usages only; DO NOT SET THIS IN NEW
        CODE.

    Raises:
      MissingCommandError: If SSH command(s) not found.
      core.retry.WaitException: SSH command failed, possibly due to short
        timeout. There is no way to distinguish between a timeout error and a
        misconfigured connection.
    """
    env = env or Environment.Current()
    run_args = {
        'env': env,
        'putty_force_connect': putty_force_connect,
    }

    # Ideally we don't want the poller to consume data on stdin that should be
    # piped to the remote command. With OpenSSH we can pass /dev/null as stdin,
    # since it reads prompt responses (host key confirmations, passwords, etc.)
    # directly from /dev/tty.
    # We can't do the same with PuTTY, as it reads prompt responses from stdin.
    if env.suite is Suite.OPENSSH:
      run_args['explicit_input_file'] = subprocess.DEVNULL

    self._retryer.RetryOnException(
        self.ssh_command.Run,
        kwargs=run_args,
        should_retry_if=lambda exc_type, *args: exc_type is CommandError,
        sleep_ms=self._sleep_ms,
    )


class FileReference(object):
  """A reference to a local or remote file (or directory) for SCP.

  Attributes:
    path: str, The path to the file.
    remote: Remote or None, the remote referred or None if local.
  """

  def __init__(self, path, remote=None):
    """Constructor for FileReference.

    Args:
      path: str, The path to the file.
      remote: Remote or None, the remote referred or None if local.
    """
    self.path = path
    self.remote = remote

  def ToArg(self):
    """Convert to a positional argument, in the form expected by `scp`/`pscp`.

    Returns:
      str, A string on the form `remote:path` if remote or `path` if local.
    """
    if not self.remote:
      return self.path
    return '{remote}:{path}'.format(remote=self.remote.ToArg(), path=self.path)

  @classmethod
  def FromPath(cls, path):
    """Convert an SCP-style positional argument to a file reference.

    Note that this method does not raise. No lookup of either local or remote
    file presence exists.

    Args:
      path: str, A path on the canonical scp form `[remote:]path`. If remote,
        `path` can be empty, e.g. `me@host:`.

    Returns:
      FileReference, the constructed object.
    """
    # If local drive given, it overrides a potential remote pattern match
    local_drive = os.path.splitdrive(path)[0]
    remote_arg, sep, file_path = path.partition(':')
    remote = Remote.FromArg(remote_arg) if sep else None
    if remote and not local_drive:
      return cls(path=file_path, remote=remote)
    else:
      return cls(path=path)

  def __eq__(self, other):
    return type(self) is type(other) and self.ToArg() == other.ToArg()

  def __ne__(self, other):
    return not self.__eq__(other)

  def __repr__(self):
    return self.ToArg()