HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/ai/log_util.py
# -*- coding: utf-8 -*- #
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for interacting with streaming logs."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import copy

from apitools.base.py import encoding
from googlecloudsdk.command_lib.logs import stream
import six

LOG_FORMAT = ('value('
              'severity,'
              'timestamp.date("%Y-%m-%d %H:%M:%S %z",tz="LOCAL"), '
              'task_name,'
              'message'
              ')')
_CONTINUE_INTERVAL = 10


def StreamLogs(name, continue_function, polling_interval, task_name,
               allow_multiline):
  """Returns the streaming log of the job by id.

  Args:
    name: string id of the entity.
    continue_function: One-arg function that takes in the number of empty polls
      and outputs a boolean to decide if we should keep polling or not. If not
      given, keep polling indefinitely.
    polling_interval: amount of time to sleep between each poll.
    task_name: String name of task.
    allow_multiline: Tells us if logs with multiline messages are okay or not.
  """
  log_fetcher = stream.LogFetcher(
      filters=_LogFilters(name, task_name=task_name),
      polling_interval=polling_interval,
      continue_interval=_CONTINUE_INTERVAL,
      continue_func=continue_function)
  return _SplitMultiline(log_fetcher.YieldLogs(), allow_multiline)


def _LogFilters(name, task_name):
  """Returns filters for log fetcher to use.

  Args:
    name: string id of the entity.
    task_name: String name of task.

  Returns:
    A list of filters to be passed to the logging API.
  """
  filters = [
      'resource.type="ml_job"', 'resource.labels.job_id="{0}"'.format(name)
  ]
  if task_name:
    filters.append('resource.labels.task_name="{0}"'.format(task_name))
  return filters


def _SplitMultiline(log_generator, allow_multiline=False):
  """Splits the dict output of logs into multiple lines.

  Args:
    log_generator: iterator that returns a an ml log in dict format.
    allow_multiline: Tells us if logs with multiline messages are okay or not.

  Yields:
    Single-line ml log dictionaries.
  """
  for log in log_generator:
    log_dict = _EntryToDict(log)
    messages = log_dict['message'].splitlines()
    if allow_multiline:
      yield log_dict
    else:
      if not messages:
        messages = ['']
      for message in messages:
        single_line_log = copy.deepcopy(log_dict)
        single_line_log['message'] = message
        yield single_line_log


def _EntryToDict(log_entry):
  """Converts a log entry to a dictionary."""
  output = {}
  output[
      'severity'] = log_entry.severity.name if log_entry.severity else 'DEFAULT'
  output['timestamp'] = log_entry.timestamp
  output['task_name'] = _GetTaskName(log_entry)
  message = []
  if log_entry.jsonPayload is not None:
    json_data = _ToDict(log_entry.jsonPayload)
    # 'message' contains a free-text message that we want to pull out of the
    # JSON.
    if 'message' in json_data:
      if json_data['message']:
        message.append(json_data['message'])
  elif log_entry.textPayload is not None:
    message.append(six.text_type(log_entry.textPayload))
  output['message'] = ''.join(message)
  return output


def _GetTaskName(log_entry):
  """Reads the label attributes of the given log entry."""
  resource_labels = {} if not log_entry.resource else _ToDict(
      log_entry.resource.labels)
  return 'unknown_task' if not resource_labels.get(
      'task_name') else resource_labels['task_name']


def _ToDict(message):
  if not message:
    return {}
  if isinstance(message, dict):
    return message
  else:
    return encoding.MessageToDict(message)