HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/platform/bq/gcloud_wrapper/gcloud_runner.py
#!/usr/bin/env python
"""Utilities to run gcloud for the BQ CLI."""

import logging
import os
import subprocess
import sys
from typing import List, Optional

from typing_extensions import TypeAlias

from pyglib import resources

if sys.version_info >= (3, 9):
  GcloudPopen: TypeAlias = subprocess.Popen[str]
else:
  # Before python 3.9 the type `subprocess.Popen[str]` is unsupported.
  GcloudPopen: TypeAlias = subprocess.Popen  # pylint: disable=g-bare-generic

_gcloud_path = None


def _get_gcloud_path() -> str:
  """Returns the string to use to call gcloud."""
  global _gcloud_path
  if _gcloud_path:
    logging.info('Found cached gcloud path: %s', _gcloud_path)
    return _gcloud_path

  if 'nt' == os.name:
    binary = 'gcloud.cmd'
  else:
    binary = 'gcloud'

  # If a gcloud binary has been bundled with this code then use that version
  # instead of the system installed version.
  try:
    binary = resources.GetResourceFilename(
        'google3/cloud/sdk/gcloud/gcloud.par'
    )
  except FileNotFoundError:
    pass

  logging.info('Found gcloud path: %s', binary)
  _gcloud_path = binary
  return binary


def run_gcloud_command(
    cmd: List[str], stderr: Optional[int] = None
) -> GcloudPopen:
  """Runs the given gcloud command and returns the Popen object."""
  gcloud_path = _get_gcloud_path()
  logging.info('Running gcloud command: %s %s', gcloud_path, ' '.join(cmd))
  return subprocess.Popen(
      [gcloud_path] + cmd,
      stdout=subprocess.PIPE,
      stderr=stderr,
      universal_newlines=True,
  )