HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/core/execution_utils.py
# -*- coding: utf-8 -*- #
# Copyright 2013 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Functions to help with shelling out to other commands."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals


import contextlib
import errno
import os
import re
import signal
import subprocess
import sys
import threading
import time


from googlecloudsdk.core import argv_utils
from googlecloudsdk.core import config
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.configurations import named_configs
from googlecloudsdk.core.util import encoding
from googlecloudsdk.core.util import parallel
from googlecloudsdk.core.util import platforms

import six
from six.moves import map


class OutputStreamProcessingException(exceptions.Error):
  """Error class for errors raised during output stream processing."""


class PermissionError(exceptions.Error):
  """User does not have execute permissions."""

  def __init__(self, error):
    super(PermissionError, self).__init__(
        '{err}\nPlease verify that you have execute permission for all '
        'files in your CLOUD SDK bin folder'.format(err=error))


class InvalidCommandError(exceptions.Error):
  """Command entered cannot be found."""

  def __init__(self, cmd):
    super(InvalidCommandError, self).__init__(
        '{cmd}: command not found'.format(cmd=cmd))

try:
  # pylint:disable=invalid-name
  TIMEOUT_EXPIRED_ERR = subprocess.TimeoutExpired

# subprocess.TimeoutExpired and subprocess.Popen.wait are only available in
# py3.3, use our own TimeoutExpired and SubprocessTimeoutWrapper classes in
# earlier versions. Callers that need to wait for subprocesses should catch
# TIMEOUT_EXPIRED_ERR instead of the underlying version-specific errors.
except AttributeError:

  class TimeoutExpired(exceptions.Error):
    """Simulate subprocess.TimeoutExpired on old (<3.3) versions of Python."""

  # pylint:disable=invalid-name
  TIMEOUT_EXPIRED_ERR = TimeoutExpired

  class SubprocessTimeoutWrapper:
    """Forwarding wrapper for subprocess.Popen, adds timeout arg to wait.

    subprocess.Popen.wait doesn't provide a timeout in versions < 3.3. This
    class wraps subprocess.Popen, adds a backported wait that includes the
    timeout arg, and forwards other calls to the underlying subprocess.Popen.

    Callers generally shouldn't use this class directly: Subprocess will
    return either a subprocess.Popen or SubprocessTimeoutWrapper as
    appropriate based on the available version of subprocesses.

    See
    https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait.
    """

    def __init__(self, proc):
      self.proc = proc

    # pylint:disable=invalid-name
    def wait(self, timeout=None):
      """Busy-wait for wrapped process to have a return code.

      Args:
        timeout: int, Seconds to wait before raising TimeoutExpired.

      Returns:
        int, The subprocess return code.

      Raises:
        TimeoutExpired: if subprocess doesn't finish before the given timeout.
      """
      if timeout is None:
        return self.proc.wait()

      now = time.time()
      later = now + timeout
      delay = 0.01  # 10ms
      ret = self.proc.poll()
      while ret is None:
        if time.time() > later:
          raise TimeoutExpired()
        time.sleep(delay)
        ret = self.proc.poll()
      return ret

    def __getattr__(self, name):
      return getattr(self.proc, name)


# Doesn't work in par or stub files.
def GetPythonExecutable():
  """Gets the path to the Python interpreter that should be used."""
  cloudsdk_python = encoding.GetEncodedValue(os.environ, 'CLOUDSDK_PYTHON')
  if cloudsdk_python:
    return cloudsdk_python
  python_bin = sys.executable
  if not python_bin:
    raise ValueError('Could not find Python executable.')
  return python_bin


# From https://en.wikipedia.org/wiki/Unix_shell#Bourne_shell_compatible
# Many scripts that we execute via execution_utils are bash scripts, and we need
# a compatible shell to run them.
# zsh, was initially on this list, but it doesn't work 100% without running it
# in `emulate sh` mode.
_BORNE_COMPATIBLE_SHELLS = [
    'ash',
    'bash',
    'busybox'
    'dash',
    'ksh',
    'mksh',
    'pdksh',
    'sh',
]


def _GetShellExecutable():
  """Gets the path to the Shell that should be used.

  First tries the current environment $SHELL, if set, then `bash` and `sh`. The
  first of these that is found is used.

  The shell must be Borne-compatible, as the commands that we execute with it
  are often bash/sh scripts.

  Returns:
    str, the path to the shell

  Raises:
    ValueError: if no Borne compatible shell is found
  """
  shells = ['/bin/bash', '/bin/sh']

  user_shell = encoding.GetEncodedValue(os.environ, 'SHELL')
  if user_shell and os.path.basename(user_shell) in _BORNE_COMPATIBLE_SHELLS:
    shells.insert(0, user_shell)

  for shell in shells:
    if os.path.isfile(shell):
      return shell

  raise ValueError("You must set your 'SHELL' environment variable to a "
                   "valid Borne-compatible shell executable to use this tool.")


def _GetToolArgs(interpreter, interpreter_args, executable_path, *args):
  tool_args = []
  if interpreter:
    tool_args.append(interpreter)
  if interpreter_args:
    tool_args.extend(interpreter_args)
  tool_args.append(executable_path)
  tool_args.extend(list(args))
  return tool_args


def GetToolEnv(env=None):
  """Generate the environment that should be used for the subprocess.

  Args:
    env: {str, str}, An existing environment to augment.  If None, the current
      environment will be cloned and used as the base for the subprocess.

  Returns:
    The modified env.
  """
  if env is None:
    env = dict(os.environ)
  env = encoding.EncodeEnv(env)
  encoding.SetEncodedValue(env, 'CLOUDSDK_WRAPPER', '1')

  # Flags can set properties which override the properties file and the existing
  # env vars.  We need to propagate them to children processes through the
  # environment so that those commands will use the same settings.
  for s in properties.VALUES:
    for p in s:
      encoding.SetEncodedValue(
          env, p.EnvironmentName(), p.Get(required=False, validate=False))

  # Configuration needs to be handled separately because it's not a real
  # property (although it behaves like one).
  encoding.SetEncodedValue(
      env, config.CLOUDSDK_ACTIVE_CONFIG_NAME,
      named_configs.ConfigurationStore.ActiveConfig().name)

  return env


# Doesn't work in par or stub files.
def ArgsForPythonTool(executable_path, *args, **kwargs):
  """Constructs an argument list for calling the Python interpreter.

  Args:
    executable_path: str, The full path to the Python main file.
    *args: args for the command
    **kwargs: python: str, path to Python executable to use (defaults to
      automatically detected)

  Returns:
    An argument list to execute the Python interpreter

  Raises:
    TypeError: if an unexpected keyword argument is passed
  """
  unexpected_arguments = set(kwargs) - set(['python'])
  if unexpected_arguments:
    raise TypeError(("ArgsForPythonTool() got unexpected keyword arguments "
                     "'[{0}]'").format(', '.join(unexpected_arguments)))
  python_executable = kwargs.get('python') or GetPythonExecutable()
  python_args_str = encoding.GetEncodedValue(
      os.environ, 'CLOUDSDK_PYTHON_ARGS', '')
  python_args = python_args_str.split()
  return _GetToolArgs(
      python_executable, python_args, executable_path, *args)


def ArgsForCMDTool(executable_path, *args):
  """Constructs an argument list for calling the cmd interpreter.

  Args:
    executable_path: str, The full path to the cmd script.
    *args: args for the command

  Returns:
    An argument list to execute the cmd interpreter
  """
  return _GetToolArgs('cmd', ['/c'], executable_path, *args)


def ArgsForExecutableTool(executable_path, *args):
  """Constructs an argument list for an executable.

   Can be used for calling a native binary or shell executable.

  Args:
    executable_path: str, The full path to the binary.
    *args: args for the command

  Returns:
    An argument list to execute the native binary
  """
  return _GetToolArgs(None, None, executable_path, *args)


# Works in regular installs as well as hermetic par and stub files. Doesn't work
# in classic par and stub files.
def ArgsForGcloud():
  """Constructs an argument list to run gcloud."""
  if not sys.executable:
    # In hermetic par/stub files sys.executable is None. In regular installs,
    # and in classic par/stub files it is a non-empty string.
    return _GetToolArgs(None, None, argv_utils.GetDecodedArgv()[0])
  return ArgsForPythonTool(config.GcloudPath())


class _ProcessHolder(object):
  """Process holder that can handle signals raised during processing."""

  def __init__(self):
    self.process = None
    self.signum = None

  def Handler(self, signum, unused_frame):
    """Handle the intercepted signal."""
    self.signum = signum
    if self.process:
      log.debug('Subprocess [{pid}] got [{signum}]'.format(
          signum=signum,
          pid=self.process.pid
      ))
      # We could have jumped to the signal handler between cleaning up our
      # finished child process in communicate() and removing the signal handler.
      # Check to see if our process is still running before we attempt to send
      # it a signal. If poll() returns None, even if the process dies right
      # between that and the terminate() call, the terminate() call will still
      # complete without an error (it just might send a signal to a zombie
      # process).
      if self.process.poll() is None:
        self.process.terminate()
      # The return code will be checked later in the normal processing flow.


@contextlib.contextmanager
def ReplaceEnv(**env_vars):
  """Temporarily set process environment variables."""
  old_environ = dict(os.environ)
  os.environ.update(env_vars)
  try:
    yield
  finally:
    os.environ.clear()
    os.environ.update(old_environ)


@contextlib.contextmanager
def _ReplaceSignal(signo, handler):
  old_handler = signal.signal(signo, handler)
  try:
    yield
  finally:
    signal.signal(signo, old_handler)


def _Exec(args,
          process_holder,
          env=None,
          out_func=None,
          err_func=None,
          in_str=None,
          **extra_popen_kwargs):
  """See Exec docstring."""
  if out_func:
    extra_popen_kwargs['stdout'] = subprocess.PIPE
  if err_func:
    extra_popen_kwargs['stderr'] = subprocess.PIPE
  if in_str:
    extra_popen_kwargs['stdin'] = subprocess.PIPE
  try:
    if args and isinstance(args, list):
      # On Python 2.x on Windows, the first arg can't be unicode. We encode
      # encode it anyway because there is really nothing else we can do if
      # that happens.
      # https://bugs.python.org/issue19264
      args = [encoding.Encode(a) for a in args]
    p = subprocess.Popen(args, env=GetToolEnv(env=env), **extra_popen_kwargs)
  except OSError as err:
    if err.errno == errno.EACCES:
      raise PermissionError(err.strerror)
    elif err.errno == errno.ENOENT:
      raise InvalidCommandError(args[0])
    raise
  process_holder.process = p

  if process_holder.signum is not None:
    # This covers the small possibility that process_holder handled a
    # signal when the process was starting but not yet set to
    # process_holder.process.
    if p.poll() is None:
      p.terminate()

  if isinstance(in_str, six.text_type):
    in_str = in_str.encode('utf-8')
  stdout, stderr = list(map(encoding.Decode, p.communicate(input=in_str)))

  if out_func:
    out_func(stdout)
  if err_func:
    err_func(stderr)
  return p.returncode


def Exec(args,
         env=None,
         no_exit=False,
         out_func=None,
         err_func=None,
         in_str=None,
         **extra_popen_kwargs):
  """Emulates the os.exec* set of commands, but uses subprocess.

  This executes the given command, waits for it to finish, and then exits this
  process with the exit code of the child process.

  Args:
    args: [str], The arguments to execute.  The first argument is the command.
    env: {str: str}, An optional environment for the child process.
    no_exit: bool, True to just return the exit code of the child instead of
      exiting.
    out_func: str->None, a function to call with the stdout of the executed
      process. This can be e.g. log.file_only_logger.debug or log.out.write.
    err_func: str->None, a function to call with the stderr of the executed
      process. This can be e.g. log.file_only_logger.debug or log.err.write.
    in_str: bytes or str, input to send to the subprocess' stdin.
    **extra_popen_kwargs: Any additional kwargs will be passed through directly
      to subprocess.Popen

  Returns:
    int, The exit code of the child if no_exit is True, else this method does
    not return.

  Raises:
    PermissionError: if user does not have execute permission for cloud sdk bin
    files.
    InvalidCommandError: if the command entered cannot be found.
  """
  log.debug('Executing command: %s', args)
  # We use subprocess instead of execv because windows does not support process
  # replacement.  The result of execv on windows is that a new processes is
  # started and the original is killed.  When running in a shell, the prompt
  # returns as soon as the parent is killed even though the child is still
  # running.  subprocess waits for the new process to finish before returning.
  process_holder = _ProcessHolder()

  # pylint:disable=protected-access
  # Python 3 has a cleaner way to check if on main thread, but must support PY2.
  if isinstance(threading.current_thread(), threading._MainThread):
    # pylint:enable=protected-access
    # Signal replacement is not allowed by Python on non-main threads.
    # https://bugs.python.org/issue38904
    with _ReplaceSignal(signal.SIGTERM, process_holder.Handler):
      with _ReplaceSignal(signal.SIGINT, process_holder.Handler):
        ret_val = _Exec(args, process_holder, env, out_func, err_func, in_str,
                        **extra_popen_kwargs)
  else:
    ret_val = _Exec(args, process_holder, env, out_func, err_func, in_str,
                    **extra_popen_kwargs)

  if no_exit and process_holder.signum is None:
    return ret_val
  sys.exit(ret_val)


def Subprocess(args, env=None, **extra_popen_kwargs):
  """Run subprocess.Popen with optional timeout and custom env.

  Returns a running subprocess. Depending on the available version of the
  subprocess library, this will return either a subprocess.Popen or a
  SubprocessTimeoutWrapper (which forwards calls to a subprocess.Popen).
  Callers should catch TIMEOUT_EXPIRED_ERR instead of
  subprocess.TimeoutExpired to be compatible with both classes.

  Args:
    args: [str], The arguments to execute.  The first argument is the command.
    env: {str: str}, An optional environment for the child process.
    **extra_popen_kwargs: Any additional kwargs will be passed through directly
      to subprocess.Popen

  Returns:
    subprocess.Popen or SubprocessTimeoutWrapper, The running subprocess.

  Raises:
    PermissionError: if user does not have execute permission for cloud sdk bin
    files.
    InvalidCommandError: if the command entered cannot be found.
  """
  # Handle unicode-encoded args, see _Exec.
  try:
    if args and isinstance(args, list):
      args = [encoding.Encode(a) for a in args]
    p = subprocess.Popen(args, env=GetToolEnv(env=env), **extra_popen_kwargs)
  except OSError as err:
    if err.errno == errno.EACCES:
      raise PermissionError(err.strerror)
    elif err.errno == errno.ENOENT:
      raise InvalidCommandError(args[0])
    raise
  process_holder = _ProcessHolder()
  process_holder.process = p
  if process_holder.signum is not None:
    if p.poll() is None:
      p.terminate()

  try:
    return SubprocessTimeoutWrapper(p)
  except NameError:
    return p


def _ProcessStreamHandler(proc, err=False, handler=log.Print):
  """Process output stream from a running subprocess in realtime."""
  stream = proc.stderr if err else proc.stdout
  stream_reader = stream.readline
  while True:
    line = stream_reader() or b''
    if not line and proc.poll() is not None:
      try:
        stream.close()
      except OSError:
        pass  # This is thread cleanup so we should just
        # exit so runner can Join()
      break
    line_str = line.decode('utf-8')
    line_str = line_str.rstrip('\r\n')
    if line_str:
      handler(line_str)


def _StreamSubprocessOutput(proc,
                            raw=False,
                            stdout_handler=log.Print,
                            stderr_handler=log.status.Print,
                            capture=False):
  """Log stdout and stderr output from running sub-process."""
  stdout = []
  stderr = []
  with ReplaceEnv(PYTHONUNBUFFERED='1'):
    while True:
      out_line = proc.stdout.readline() or b''
      err_line = proc.stderr.readline() or b''
      if not (err_line or out_line) and proc.poll() is not None:
        break
      if out_line:
        if capture:
          stdout.append(out_line)
        out_str = out_line.decode('utf-8')
        out_str = out_str.rstrip('\r\n') if not raw else out_str
        stdout_handler(out_str)

      if err_line:
        if capture:
          stderr.append(err_line)
        err_str = err_line.decode('utf-8')
        err_str = err_str.rstrip('\r\n') if not raw else err_str
        stderr_handler(err_str)
  return proc.returncode, stdout, stderr


def _KillProcIfRunning(proc):
  """Kill process and close open streams."""
  if proc:
    code = None
    if hasattr(proc, 'returncode'):
      code = proc.returncode
    elif hasattr(proc, 'exitcode'):
      code = proc.exitcode
    if code is None or proc.poll() is None:
      proc.terminate()
    try:
      if proc.stdin and not proc.stdin.closed:
        proc.stdin.close()
      if proc.stdout and not proc.stdout.closed:
        proc.stdout.close()
      if proc.stderr and not proc.stderr.closed:
        proc.stderr.close()
    except OSError:
      pass  # Clean Up


def ExecWithStreamingOutput(args,
                            env=None,
                            no_exit=False,
                            out_func=None,
                            err_func=None,
                            in_str=None,
                            **extra_popen_kwargs):
  """Emulates the os.exec* set of commands, but uses subprocess.

  This executes the given command, waits for it to finish, and then exits this
  process with the exit code of the child process. Allows realtime processing of
  stderr and stdout from subprocess using threads.

  Args:
    args: [str], The arguments to execute.  The first argument is the command.
    env: {str: str}, An optional environment for the child process.
    no_exit: bool, True to just return the exit code of the child instead of
      exiting.
    out_func: str->None, a function to call with each line of the stdout of the
      executed process. This can be e.g. log.file_only_logger.debug or
      log.out.write.
    err_func: str->None, a function to call with each line of the stderr of
      the executed process. This can be e.g. log.file_only_logger.debug or
      log.err.write.
    in_str: bytes or str, input to send to the subprocess' stdin.
    **extra_popen_kwargs: Any additional kwargs will be passed through directly
      to subprocess.Popen

  Returns:
    int, The exit code of the child if no_exit is True, else this method does
    not return.

  Raises:
    PermissionError: if user does not have execute permission for cloud sdk bin
    files.
    InvalidCommandError: if the command entered cannot be found.
  """
  log.debug('Executing command: %s', args)
  # We use subprocess instead of execv because windows does not support process
  # replacement.  The result of execv on windows is that a new processes is
  # started and the original is killed.  When running in a shell, the prompt
  # returns as soon as the parent is killed even though the child is still
  # running.  subprocess waits for the new process to finish before returning.
  env = GetToolEnv(env=env)
  process_holder = _ProcessHolder()
  with _ReplaceSignal(signal.SIGTERM, process_holder.Handler):
    with _ReplaceSignal(signal.SIGINT, process_holder.Handler):
      out_handler_func = out_func or log.Print
      err_handler_func = err_func or log.status.Print
      if in_str:
        extra_popen_kwargs['stdin'] = subprocess.PIPE
      try:
        if args and isinstance(args, list):
          # On Python 2.x on Windows, the first arg can't be unicode. We encode
          # encode it anyway because there is really nothing else we can do if
          # that happens.
          # https://bugs.python.org/issue19264
          args = [encoding.Encode(a) for a in args]
        p = subprocess.Popen(args, env=env, stderr=subprocess.PIPE,
                             stdout=subprocess.PIPE, **extra_popen_kwargs)
        process_holder.process = p
        if in_str:
          in_str = six.text_type(in_str).encode('utf-8')
          try:
            p.stdin.write(in_str)
            p.stdin.close()
          except OSError as exc:
            if (exc.errno == errno.EPIPE or
                exc.errno == errno.EINVAL):
              pass  # Obey same conventions as subprocess.communicate()
            else:
              _KillProcIfRunning(p)
              raise OutputStreamProcessingException(exc)

        try:
          with parallel.GetPool(2) as pool:
            std_out_future = pool.ApplyAsync(_ProcessStreamHandler,
                                             (p, False, out_handler_func))
            std_err_future = pool.ApplyAsync(_ProcessStreamHandler,
                                             (p, True, err_handler_func))
            std_out_future.Get()
            std_err_future.Get()
        except Exception as e:
          _KillProcIfRunning(p)
          raise OutputStreamProcessingException(e)

      except OSError as err:
        if err.errno == errno.EACCES:
          raise PermissionError(err.strerror)
        elif err.errno == errno.ENOENT:
          raise InvalidCommandError(args[0])
        raise

      if process_holder.signum is not None:
        # This covers the small possibility that process_holder handled a
        # signal when the process was starting but not yet set to
        # process_holder.process.
        _KillProcIfRunning(p)

      ret_val = p.returncode

  if no_exit and process_holder.signum is None:
    return ret_val
  sys.exit(ret_val)


def ExecWithStreamingOutputNonThreaded(args,
                                       env=None,
                                       no_exit=False,
                                       out_func=None,
                                       err_func=None,
                                       in_str=None,
                                       raw_output=False,
                                       **extra_popen_kwargs):
  """Emulates the os.exec* set of commands, but uses subprocess.

  This executes the given command, waits for it to finish, and then exits this
  process with the exit code of the child process. Allows realtime processing of
  stderr and stdout from subprocess without threads.

  Args:
    args: [str], The arguments to execute.  The first argument is the command.
    env: {str: str}, An optional environment for the child process.
    no_exit: bool, True to just return the exit code of the child instead of
      exiting.
    out_func: str->None, a function to call with each line of the stdout of the
      executed process. This can be e.g. log.file_only_logger.debug or
      log.out.write.
    err_func: str->None, a function to call with each line of the stderr of
      the executed process. This can be e.g. log.file_only_logger.debug or
      log.err.write.
    in_str: bytes or str, input to send to the subprocess' stdin.
    raw_output: bool, stream raw lines of output perserving line
      endings/formatting.
    **extra_popen_kwargs: Any additional kwargs will be passed through directly
      to subprocess.Popen

  Returns:
    int, The exit code of the child if no_exit is True, else this method does
    not return.

  Raises:
    PermissionError: if user does not have execute permission for cloud sdk bin
    files.
    InvalidCommandError: if the command entered cannot be found.
  """
  log.debug('Executing command: %s', args)
  # We use subprocess instead of execv because windows does not support process
  # replacement.  The result of execv on windows is that a new processes is
  # started and the original is killed.  When running in a shell, the prompt
  # returns as soon as the parent is killed even though the child is still
  # running.  subprocess waits for the new process to finish before returning.
  env = GetToolEnv(env=env)
  process_holder = _ProcessHolder()
  with _ReplaceSignal(signal.SIGTERM, process_holder.Handler):
    with _ReplaceSignal(signal.SIGINT, process_holder.Handler):
      out_handler_func = out_func or log.Print
      err_handler_func = err_func or log.status.Print
      if in_str:
        extra_popen_kwargs['stdin'] = subprocess.PIPE
      try:
        if args and isinstance(args, list):
          # On Python 2.x on Windows, the first arg can't be unicode. We encode
          # encode it anyway because there is really nothing else we can do if
          # that happens.
          # https://bugs.python.org/issue19264
          args = [encoding.Encode(a) for a in args]
        p = subprocess.Popen(args, env=env, stderr=subprocess.PIPE,
                             stdout=subprocess.PIPE, **extra_popen_kwargs)

        if in_str:
          in_str = six.text_type(in_str).encode('utf-8')
          try:
            p.stdin.write(in_str)
            p.stdin.close()
          except OSError as exc:
            if (exc.errno == errno.EPIPE or
                exc.errno == errno.EINVAL):
              pass  # Obey same conventions as subprocess.communicate()
            else:
              _KillProcIfRunning(p)
              raise OutputStreamProcessingException(exc)

        try:
          _StreamSubprocessOutput(p, stdout_handler=out_handler_func,
                                  stderr_handler=err_handler_func,
                                  raw=raw_output)
        except Exception as e:
          _KillProcIfRunning(p)
          raise OutputStreamProcessingException(e)
      except OSError as err:
        if err.errno == errno.EACCES:
          raise PermissionError(err.strerror)
        elif err.errno == errno.ENOENT:
          raise InvalidCommandError(args[0])
        raise
      process_holder.process = p

      if process_holder.signum is not None:
        # This covers the small possibility that process_holder handled a
        # signal when the process was starting but not yet set to
        # process_holder.process.
        _KillProcIfRunning(p)

      ret_val = p.returncode

  if no_exit and process_holder.signum is None:
    return ret_val
  sys.exit(ret_val)


def UninterruptibleSection(stream, message=None):
  """Run a section of code with CTRL-C disabled.

  When in this context manager, the ctrl-c signal is caught and a message is
  printed saying that the action cannot be cancelled.

  Args:
    stream: the stream to write to if SIGINT is received
    message: str, optional: the message to write

  Returns:
    Context manager that is uninterruptible during its lifetime.
  """
  message = '\n\n{message}\n\n'.format(
      message=(message or 'This operation cannot be cancelled.'))
  def _Handler(unused_signal, unused_frame):
    stream.write(message)
  return CtrlCSection(_Handler)


def RaisesKeyboardInterrupt():
  """Run a section of code where CTRL-C raises KeyboardInterrupt."""
  def _Handler(signal, frame):  # pylint: disable=redefined-outer-name
    del signal, frame  # Unused in _Handler
    raise KeyboardInterrupt
  return CtrlCSection(_Handler)


def CtrlCSection(handler):
  """Run a section of code with CTRL-C redirected handler.

  Args:
    handler: func(), handler to call if SIGINT is received. In every case
      original Ctrl-C handler is not invoked.

  Returns:
    Context manager that redirects ctrl-c handler during its lifetime.
  """
  return _ReplaceSignal(signal.SIGINT, handler)


def KillSubprocess(p):
  """Kills a subprocess using an OS specific method when python can't do it.

  This also kills all processes rooted in this process.

  Args:
    p: the Popen or multiprocessing.Process object to kill

  Raises:
    RuntimeError: if it fails to kill the process
  """

  # This allows us to kill a Popen object or a multiprocessing.Process object
  code = None
  if hasattr(p, 'returncode'):
    code = p.returncode
  elif hasattr(p, 'exitcode'):
    code = p.exitcode

  if code is not None:
    # already dead
    return

  if platforms.OperatingSystem.Current() == platforms.OperatingSystem.WINDOWS:
    # Consume stdout so it doesn't show in the shell
    taskkill_process = subprocess.Popen(
        ['taskkill', '/F', '/T', '/PID', six.text_type(p.pid)],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE)
    (stdout, stderr) = taskkill_process.communicate()
    if taskkill_process.returncode != 0 and _IsTaskKillError(stderr):
      # Sometimes taskkill does things in the wrong order and the processes
      # disappear before it gets a chance to kill it.  This is exposed as an
      # error even though it's the outcome we want.
      raise RuntimeError(
          'Failed to call taskkill on pid {0}\nstdout: {1}\nstderr: {2}'
          .format(p.pid, stdout, stderr))

  else:
    # Create a mapping of ppid to pid for all processes, then kill all
    # subprocesses from the main process down

    # set env LANG for subprocess.Popen to be 'en_US.UTF-8'
    new_env = encoding.EncodeEnv(dict(os.environ))
    new_env['LANG'] = 'en_US.UTF-8'
    get_pids_process = subprocess.Popen(['ps', '-e',
                                         '-o', 'ppid=', '-o', 'pid='],
                                        stdout=subprocess.PIPE,
                                        stderr=subprocess.PIPE,
                                        env=new_env)
    (stdout, stderr) = get_pids_process.communicate()
    stdout = stdout.decode('utf-8')
    if get_pids_process.returncode != 0:
      raise RuntimeError('Failed to get subprocesses of process: {0}'
                         .format(p.pid))

    # Create the process map
    pid_map = {}
    for line in stdout.strip().split('\n'):
      (ppid, pid) = re.match(r'\s*(\d+)\s+(\d+)', line).groups()
      ppid = int(ppid)
      pid = int(pid)
      children = pid_map.get(ppid)
      if not children:
        pid_map[ppid] = [pid]
      else:
        children.append(pid)

    # Expand all descendants of the main process
    all_pids = [p.pid]
    to_process = [p.pid]
    while to_process:
      current = to_process.pop()
      children = pid_map.get(current)
      if children:
        to_process.extend(children)
        all_pids.extend(children)

    # Kill all the subprocesses we found
    for pid in all_pids:
      _KillPID(pid)

    # put this in if you need extra info from the process itself
    # print p.communicate()


def _IsTaskKillError(stderr):
  """Returns whether the stderr output of taskkill indicates it failed.

  Args:
    stderr: the string error output of the taskkill command

  Returns:
    True iff the stderr is considered to represent an actual error.
  """
  # The taskkill "reason" string indicates why it fails. We consider the
  # following reasons to be acceptable. Reason strings differ among different
  # versions of taskkill. If you know a string is specific to a version, feel
  # free to document that here.
  non_error_reasons = (
      # The process might be in the midst of exiting.
      'Access is denied.',
      'The operation attempted is not supported.',
      'There is no running instance of the task.',
      'There is no running instance of the task to terminate.')
  non_error_patterns = (
      re.compile(r'The process "\d+" not found\.'),)
  for reason in non_error_reasons:
    if reason in stderr:
      return False
  for pattern in non_error_patterns:
    if pattern.search(stderr):
      return False

  return True


def _KillPID(pid):
  """Kills the given process with SIGTERM, then with SIGKILL if it doesn't stop.

  Args:
    pid: The process id of the process to check.
  """
  try:
    # Try sigterm first.
    os.kill(pid, signal.SIGTERM)

    # If still running, wait a few seconds to see if it dies.
    deadline = time.time() + 3
    while time.time() < deadline:
      if not _IsStillRunning(pid):
        return
      time.sleep(0.1)

    # No luck, just force kill it.
    os.kill(pid, signal.SIGKILL)
  except OSError as error:
    if 'No such process' not in error.strerror:
      exceptions.reraise(sys.exc_info()[1])


def _IsStillRunning(pid):
  """Determines if the given pid is still running.

  Args:
    pid: The process id of the process to check.

  Returns:
    bool, True if it is still running.
  """
  try:
    (actual_pid, code) = os.waitpid(pid, os.WNOHANG)
    if (actual_pid, code) == (0, 0):
      return True
  except OSError as error:
    if 'No child processes' not in error.strerror:
      exceptions.reraise(sys.exc_info()[1])
  return False