HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/api_lib/cloudbuild/logs.py
# -*- coding: utf-8 -*- #
# Copyright 2016 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Manage and stream build logs from Cloud Builds."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import collections
import re
import threading
import time

from apitools.base.py import exceptions as api_exceptions

from googlecloudsdk.api_lib.cloudbuild import cloudbuild_util
from googlecloudsdk.api_lib.logging import common
from googlecloudsdk.core import exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources
from googlecloudsdk.core.console import console_attr_os
from googlecloudsdk.core.credentials import requests as creds_requests
from googlecloudsdk.core.util import encoding

import requests

LOG_STREAM_HELP_TEXT = """
To live stream log output for this build, please ensure the grpc module is installed. Run:
  pip install grpcio
and set:
  export CLOUDSDK_PYTHON_SITEPACKAGES=1
"""

DEFAULT_LOGS_BUCKET_IS_OUTSIDE_SECURITY_PERIMETER_TEXT = """
The build is running, and logs are being written to the default logs bucket.
This tool can only stream logs if you are Viewer/Owner of the project and, if applicable, allowed by your VPC-SC security policy.

The default logs bucket is always outside any VPC-SC security perimeter.
If you want your logs saved inside your VPC-SC perimeter, use your own bucket.
See https://cloud.google.com/build/docs/securing-builds/store-manage-build-logs.
"""


class NoLogsBucketException(exceptions.Error):

  def __init__(self):
    msg = 'Build does not specify logsBucket, unable to stream logs'
    super(NoLogsBucketException, self).__init__(msg)


class DefaultLogsBucketIsOutsideSecurityPerimeterException(exceptions.Error):

  def __init__(self):
    super(DefaultLogsBucketIsOutsideSecurityPerimeterException,
          self).__init__(DEFAULT_LOGS_BUCKET_IS_OUTSIDE_SECURITY_PERIMETER_TEXT)


Response = collections.namedtuple('Response', ['status', 'headers', 'body'])


class RequestsLogTailer(object):
  """LogTailer transport to make HTTP requests using requests."""

  def __init__(self):
    self.session = creds_requests.GetSession()

  def Request(self, url, cursor):
    try:
      response = self.session.request(
          'GET', url, headers={'Range': 'bytes={0}-'.format(cursor)})
      return Response(response.status_code, response.headers, response.content)
    except requests.exceptions.RequestException as e:
      raise api_exceptions.CommunicationError('Failed to connect: %s' % e)


def GetGCLLogTailer():
  """Return a GCL LogTailer."""
  try:
    # pylint: disable=g-import-not-at-top
    from googlecloudsdk.api_lib.logging import tailing
    # pylint: enable=g-import-not-at-top
  except ImportError:
    log.out.Print(LOG_STREAM_HELP_TEXT)
    return None

  return tailing.LogTailer()


def IsCB4A(build):
  """Separate CB4A requests to print different logs."""
  if build.options:
    if build.options.cluster:
      return bool(build.options.cluster.name)
    elif build.options.anthosCluster:
      return bool(build.options.anthosCluster.membership)
  return False


class TailerBase(object):
  """Base class for log tailer classes."""
  LOG_OUTPUT_BEGIN = ' REMOTE BUILD OUTPUT '
  OUTPUT_LINE_CHAR = '-'

  def _ValidateScreenReader(self, text):
    """Modify output for better screen reader experience."""
    screen_reader = properties.VALUES.accessibility.screen_reader.GetBool()
    if screen_reader:
      return re.sub('---> ', '', text)
    return text

  def _PrintLogLine(self, text):
    """Testing Hook: This method enables better verification of output."""
    if self.out and text:
      self.out.Print(text.rstrip())

  def _PrintFirstLine(self, msg=LOG_OUTPUT_BEGIN):
    """Print a pretty starting line to identify start of build output logs."""
    width, _ = console_attr_os.GetTermSize()
    self._PrintLogLine(msg.center(width, self.OUTPUT_LINE_CHAR))

  def _PrintLastLine(self, msg=''):
    """Print a pretty ending line to identify end of build output logs."""
    width, _ = console_attr_os.GetTermSize()
    # We print an extra blank visually separating the log from other output.
    self._PrintLogLine(msg.center(width, self.OUTPUT_LINE_CHAR) + '\n')


class GCLLogTailer(TailerBase):
  """Helper class to tail logs from GCL, printing content as available."""

  def __init__(self,
               buildId,
               projectId,
               timestamp,
               logUrl=None,
               out=log.status,
               is_cb4a=False):
    self.tailer = GetGCLLogTailer()
    self.build_id = buildId
    self.project_id = projectId
    self.timestamp = timestamp
    self.out = out
    self.buffer_window_seconds = 2
    self.log_url = logUrl
    self.stop = False
    self.is_cb4a = is_cb4a

  @classmethod
  def FromBuild(cls, build, out=log.out):
    """Build a GCLLogTailer from a build resource.

    Args:
      build: Build resource, The build whose logs shall be streamed.
      out: The output stream to write the logs to.

    Returns:
      GCLLogTailer, the tailer of this build's logs.
    """
    return cls(
        buildId=build.id,
        projectId=build.projectId,
        timestamp=build.createTime,
        logUrl=build.logUrl,
        out=out,
        is_cb4a=IsCB4A(build))

  def Tail(self):
    """Tail the GCL logs and print any new bytes to the console."""

    if not self.tailer:
      return

    if self.stop:
      return

    parent = 'projects/{project_id}'.format(project_id=self.project_id)

    log_filter = ('logName="projects/{project_id}/logs/cloudbuild" AND '
                  'resource.type="build" AND '
                  'resource.labels.build_id="{build_id}"').format(
                      project_id=self.project_id, build_id=self.build_id)
    if self.is_cb4a:
      # The labels starting with 'k8s-pod/' in the log entries from GKE-on-GCP
      # clusters are different from other labels. The dots '.' in the labels are
      # converted to '_'. For example, 'k8s-pod/tekton.dev/taskRun' is
      # converted to 'k8s-pod/tekton_dev/taskRun'.
      log_filter = ('labels."k8s-pod/tekton.dev/taskRun"="{build_id}" OR '
                    'labels."k8s-pod/tekton_dev/taskRun"="{build_id}"').format(
                        build_id=self.build_id)

    output_logs = self.tailer.TailLogs(
        [parent], log_filter, buffer_window_seconds=self.buffer_window_seconds)

    self._PrintFirstLine()

    for output in output_logs:
      text = self._ValidateScreenReader(output.text_payload)
      self._PrintLogLine(text)

    self._PrintLastLine(' BUILD FINISHED; TRUNCATING OUTPUT LOGS ')
    if self.log_url:
      self._PrintLogLine(
          'Logs are available at [{log_url}].'.format(log_url=self.log_url))

    return

  def Stop(self):
    """Stop log tailing."""
    self.stop = True
    # Sleep to allow the Tailing API to send the last logs it buffered up
    time.sleep(self.buffer_window_seconds)
    if self.tailer:
      self.tailer.Stop()

  def Print(self):
    """Print GCL logs to the console."""
    parent = 'projects/{project_id}'.format(project_id=self.project_id)

    log_filter = (
        'logName="projects/{project_id}/logs/cloudbuild" AND '
        'resource.type="build" AND '
        # timestamp needed for faster querying in GCL
        'timestamp>="{timestamp}" AND '
        'resource.labels.build_id="{build_id}"').format(
            project_id=self.project_id,
            timestamp=self.timestamp,
            build_id=self.build_id)
    if self.is_cb4a:
      # The labels starting with 'k8s-pod/' in the log entries from GKE-on-GCP
      # clusters are different from other labels. The dots '.' in the labels are
      # converted to '_'. For example, 'k8s-pod/tekton.dev/taskRun' is
      # converted to 'k8s-pod/tekton_dev/taskRun'.
      log_filter = ('(labels."k8s-pod/tekton.dev/taskRun"="{build_id}" OR '
                    'labels."k8s-pod/tekton_dev/taskRun"="{build_id}") AND '
                    'timestamp>="{timestamp}"').format(
                        build_id=self.build_id, timestamp=self.timestamp)

    output_logs = common.FetchLogs(
        log_filter=log_filter, order_by='asc', parent=parent)

    self._PrintFirstLine()

    for output in output_logs:
      text = self._ValidateScreenReader(output.textPayload)
      self._PrintLogLine(text)

    self._PrintLastLine()


class GCSLogTailer(TailerBase):
  """Helper class to tail a GCS logfile, printing content as available."""

  LOG_OUTPUT_INCOMPLETE = ' (possibly incomplete) '

  def __init__(self, bucket, obj, out=log.status):
    self.transport = RequestsLogTailer()
    self.url = self._StorageUrl(bucket, obj)
    log.debug('GCS logfile url is ' + self.url)
    # position in the file being read
    self.cursor = 0
    self.out = out
    self.stop = False

  def _StorageUrl(self, bucket, obj):
    url_pattern = 'https://storage.googleapis.com/{bucket}/{obj}'
    if properties.VALUES.context_aware.use_client_certificate.GetBool():
      # mTLS is enabled.
      url_pattern = 'https://storage.mtls.googleapis.com/{bucket}/{obj}'
    return url_pattern.format(bucket=bucket, obj=obj)

  @classmethod
  def FromBuild(cls, build, out=log.out):
    """Build a GCSLogTailer from a build resource.

    Args:
      build: Build resource, The build whose logs shall be streamed.
      out: The output stream to write the logs to.

    Raises:
      NoLogsBucketException: If the build does not specify a logsBucket.

    Returns:
      GCSLogTailer, the tailer of this build's logs.
    """
    if not build.logsBucket:
      raise NoLogsBucketException()

    # remove gs:// prefix from bucket
    log_stripped = build.logsBucket
    gcs_prefix = 'gs://'
    if log_stripped.startswith(gcs_prefix):
      log_stripped = log_stripped[len(gcs_prefix):]

    if '/' not in log_stripped:
      log_bucket = log_stripped
      log_object_dir = ''
    else:
      [log_bucket, log_object_dir] = log_stripped.split('/', 1)
      log_object_dir += '/'

    log_object = '{object}log-{id}.txt'.format(
        object=log_object_dir,
        id=build.id,
    )

    return cls(
        bucket=log_bucket,
        obj=log_object,
        out=out)

  def Poll(self, is_last=False):
    """Poll the GCS object and print any new bytes to the console.

    Args:
      is_last: True if this is the final poll operation.

    Raises:
      api_exceptions.HttpError: if there is trouble connecting to GCS.
      api_exceptions.CommunicationError: if there is trouble reaching the server
          and is_last=True.
    """
    try:
      res = self.transport.Request(self.url, self.cursor)
    except api_exceptions.CommunicationError:
      # Sometimes this request fails due to read timeouts (b/121307719). When
      # this happens we should just proceed and rely on the next poll to pick
      # up any missed logs. If this is the last request, there won't be another
      # request, and we can just fail.
      if is_last:
        raise
      return

    if res.status == 404:  # Not Found
      # Logfile hasn't been written yet (ie, build hasn't started).
      log.debug('Reading GCS logfile: 404 (no log yet; keep polling)')
      return

    if res.status == 416:  # Requested Range Not Satisfiable
      # We have consumed all available data. We'll get this a lot as we poll.
      log.debug('Reading GCS logfile: 416 (no new content; keep polling)')
      if is_last:
        self._PrintLastLine()
      return

    if res.status == 206 or res.status == 200:  # Partial Content
      # New content available. Print it!
      log.debug('Reading GCS logfile: {code} (read {count} bytes)'.format(
          code=res.status, count=len(res.body)))
      if self.cursor == 0:
        self._PrintFirstLine()
      self.cursor += len(res.body)
      decoded = encoding.Decode(res.body)
      if decoded is not None:
        decoded = self._ValidateScreenReader(decoded)
      self._PrintLogLine(decoded.rstrip('\n'))

      if is_last:
        self._PrintLastLine()
      return

    # For 429/503, there isn't much to do other than retry on the next poll.
    # If we get a 429 after the build has completed, the user may get incomplete
    # logs. This is expected to be rare enough to not justify building a complex
    # exponential retry system.
    if res.status == 429:  # Too Many Requests
      log.warning('Reading GCS logfile: 429 (server is throttling us)')
      if is_last:
        self._PrintLastLine(self.LOG_OUTPUT_INCOMPLETE)
      return

    if res.status >= 500 and res.status < 600:  # Server Error
      log.warning('Reading GCS logfile: got {0}, retrying'.format(res.status))
      if is_last:
        self._PrintLastLine(self.LOG_OUTPUT_INCOMPLETE)
      return

    # Default: any other codes are treated as errors.
    headers = dict(res.headers)
    headers['status'] = res.status
    raise api_exceptions.HttpError(headers, res.body, self.url)

  def Tail(self):
    """Tail the GCS object and print any new bytes to the console."""
    while not self.stop:
      self.Poll()
      time.sleep(1)

    # Poll the logs one final time to ensure we have everything. We know this
    # final poll will get the full log contents because GCS is strongly
    # consistent and Cloud Build waits for logs to finish pushing before
    # marking the build complete.
    self.Poll(is_last=True)

  def Stop(self):
    """Stop log tailing."""
    self.stop = True

  def Print(self):
    """Print GCS logs to the console."""
    self.Poll(is_last=True)


class ThreadInterceptor(threading.Thread):
  """Wrapper to intercept thread exceptions."""

  def __init__(self, target):
    super(ThreadInterceptor, self).__init__()
    self.target = target
    self.exception = None

  def run(self):
    try:
      self.target()
    except api_exceptions.HttpError as e:
      if e.status_code == 403:
        # The only way to successfully create a build and then be unable to read
        # the logs bucket is if you are using the default logs bucket and
        # VPC-SC.
        self.exception = DefaultLogsBucketIsOutsideSecurityPerimeterException()
      else:
        self.exception = e
    except api_exceptions.CommunicationError as e:
      self.exception = e


class CloudBuildClient(object):
  """Client for interacting with the Cloud Build API (and Cloud Build logs)."""

  def __init__(self,
               client=None,
               messages=None,
               support_gcl=False,
               polling_interval=1):
    self.client = client or cloudbuild_util.GetClientInstance()
    self.messages = messages or cloudbuild_util.GetMessagesModule()
    self.support_gcl = support_gcl
    self.polling_interval = polling_interval

  def GetBuild(self, build_ref):
    """Get a Build message.

    Args:
      build_ref: Build reference. Expects a cloudbuild.projects.locations.builds
        but also supports cloudbuild.projects.builds.

    Returns:
      Build resource
    """
    # Legacy build_refs (for cloudbuild.projects.builds) don't have a location
    # attached. Convert to the expected type and add the default location.
    if build_ref.Collection() == 'cloudbuild.projects.builds':
      build_ref = resources.REGISTRY.Create(
          collection='cloudbuild.projects.locations.builds',
          projectsId=build_ref.projectId,
          locationsId=cloudbuild_util.DEFAULT_REGION,
          buildsId=build_ref.id)

    return self.client.projects_locations_builds.Get(
        self.messages.CloudbuildProjectsLocationsBuildsGetRequest(
            name=build_ref.RelativeName()))

  def ShouldStopTailer(self, build, build_ref, log_tailer, working_statuses):
    """Checks whether a log tailer should be stopped.

    Args:
      build: Build object, containing build status
      build_ref: Build reference, The build whose logs shall be streamed.
      log_tailer: Specific log tailer object
      working_statuses: Valid working statuses that define we should continue
        tailing

    Returns:
      Build message, the completed or terminated build.
    """
    log.status.Print('Waiting for build to complete. Polling interval: ' +
                     str(self.polling_interval) + ' second(s).')
    while build.status in working_statuses:
      build = self.GetBuild(build_ref)
      time.sleep(self.polling_interval)

    if log_tailer:
      log_tailer.Stop()

    return build

  def Stream(self, build_ref, out=log.out):
    """Streams the logs for a build if available.

    Regardless of whether logs are available for streaming, awaits build
    completion before returning.

    Args:
      build_ref: Build reference, The build whose logs shall be streamed.
      out: The output stream to write the logs to.

    Raises:
      NoLogsBucketException: If the build is expected to specify a logsBucket
      but does not.

    Returns:
      Build message, the completed or terminated build.
    """
    build = self.GetBuild(build_ref)
    if not build.options or build.options.logging not in [
        self.messages.BuildOptions.LoggingValueValuesEnum.NONE,
        self.messages.BuildOptions.LoggingValueValuesEnum.STACKDRIVER_ONLY,
        self.messages.BuildOptions.LoggingValueValuesEnum.CLOUD_LOGGING_ONLY,
    ]:
      log_tailer = GCSLogTailer.FromBuild(build, out=out)
    elif build.options.logging in [
        self.messages.BuildOptions.LoggingValueValuesEnum.STACKDRIVER_ONLY,
        self.messages.BuildOptions.LoggingValueValuesEnum.CLOUD_LOGGING_ONLY,
    ] and self.support_gcl:
      log.info('Streaming logs from GCL: requested logging mode is {0}.'.format(
          build.options.logging))
      log_tailer = GCLLogTailer.FromBuild(build, out=out)
    else:
      log.info('Not streaming logs: requested logging mode is {0}.'.format(
          build.options.logging))
      log_tailer = None

    statuses = self.messages.Build.StatusValueValuesEnum
    working_statuses = [
        statuses.QUEUED,
        statuses.WORKING,
    ]

    t = None
    if log_tailer:
      t = ThreadInterceptor(target=log_tailer.Tail)
      t.start()
    build = self.ShouldStopTailer(build, build_ref, log_tailer,
                                  working_statuses)
    if t:
      t.join()
      if t.exception is not None:
        raise t.exception

    return build

  def PrintLog(self, build_ref):
    """Print the logs for a build.

    Args:
      build_ref: Build reference, The build whose logs shall be streamed.

    Raises:
      NoLogsBucketException: If the build does not specify a logsBucket.
    """
    build = self.GetBuild(build_ref)

    if not build.options or build.options.logging not in [
        self.messages.BuildOptions.LoggingValueValuesEnum.NONE,
        self.messages.BuildOptions.LoggingValueValuesEnum.STACKDRIVER_ONLY,
        self.messages.BuildOptions.LoggingValueValuesEnum.CLOUD_LOGGING_ONLY,
    ]:
      log_tailer = GCSLogTailer.FromBuild(build)
    elif build.options.logging in [
        self.messages.BuildOptions.LoggingValueValuesEnum.STACKDRIVER_ONLY,
        self.messages.BuildOptions.LoggingValueValuesEnum.CLOUD_LOGGING_ONLY,
    ]:
      log.info('Printing logs from GCL: requested logging mode is {0}.'.format(
          build.options.logging))
      log_tailer = GCLLogTailer.FromBuild(build)
    else:
      log.info('Logs not available: build logging mode is {0}.'.format(
          build.options.logging))
      log_tailer = None

    if log_tailer:
      log_tailer.Print()