HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/ml_engine/log_utils.py
# -*- coding: utf-8 -*- #
# Copyright 2017 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper functions for the ml-engine  client to use command_lib.logs.stream."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import copy

from apitools.base.py import encoding

from googlecloudsdk.api_lib.ml_engine import jobs
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources
import six

LOG_FORMAT = ('value('
              'severity,'
              'timestamp.date("%Y-%m-%d %H:%M:%S %z",tz="LOCAL"), '
              'task_name,'
              'trial_id,'
              'message'
              ')'
             )


# TODO(b/36057459): Remove ml_job after transition from ml_job to cloudml_job is
# done. See b/34459608.
def LogFilters(job_id, task_name=None):
  """Returns filters for log fetcher to use.

  Args:
    job_id: String id of job.
    task_name: String name of task.

  Returns:
    A list of filters to be passed to the logging API.
  """
  filters = [
      '(resource.type="ml_job" OR resource.type="cloudml_job")',
      'resource.labels.job_id="{0}"'.format(job_id)
  ]
  if task_name:
    filters.append(
        '(resource.labels.task_name="{0}" OR labels.task_name="{0}")'.format(
            task_name))
  return filters


def MakeContinueFunction(job_id):
  """Returns a function to decide if log fetcher should continue polling.

  Args:
    job_id: String id of job.

  Returns:
    A one-argument function decides if log fetcher should continue.
  """
  jobs_client = jobs.JobsClient()
  project_id = properties.VALUES.core.project.Get(required=True)
  job_ref = resources.REGISTRY.Create(
      'ml.projects.jobs', jobsId=job_id, projectsId=project_id)
  def ShouldContinue(periods_without_logs):
    """Returns whether to continue polling the logs.

    Returns False only once we've checked the job and it is finished; we only
    check whether the job is finished once we've gone >1 interval without
    getting any new logs.

    Args:
      periods_without_logs: integer number of empty polls.

    Returns:
      True if we haven't tried polling more than once or if job is not finished.
    """
    if periods_without_logs <= 1:
      return True
    return jobs_client.Get(job_ref).endTime is None
  return ShouldContinue


def SplitMultiline(log_generator, allow_multiline=False):
  """Splits the dict output of logs into multiple lines.

  Args:
    log_generator: iterator that returns a an ml log in dict format.
    allow_multiline: Tells us if logs with multiline messages are okay or not.

  Yields:
    Single-line ml log dictionaries.
  """
  for log in log_generator:
    log_dict = _EntryToDict(log)
    if allow_multiline:
      yield log_dict
    else:
      messages = log_dict['message'].splitlines()
      if not messages:
        messages = ['']
      for message in messages:
        single_line_log = copy.deepcopy(log_dict)
        single_line_log['message'] = message
        yield single_line_log


def _EntryToDict(log_entry):
  """Converts a log entry to a dictionary."""
  output = {}
  output[
      'severity'] = log_entry.severity.name if log_entry.severity else 'DEFAULT'
  output['timestamp'] = log_entry.timestamp
  label_attributes = _GetLabelAttributes(log_entry)
  output['task_name'] = label_attributes['task_name']
  if 'trial_id' in label_attributes:
    output['trial_id'] = label_attributes['trial_id']
  output['message'] = ''
  if log_entry.jsonPayload is not None:
    json_data = _ToDict(log_entry.jsonPayload)
    # 'message' contains a free-text message that we want to pull out of the
    # JSON.
    if 'message' in json_data:
      if json_data['message']:
        output['message'] += json_data['message']
      del json_data['message']
    # Don't put 'levelname' in the JSON, since it duplicates the
    # information in log_entry.severity.name
    if 'levelname' in json_data:
      del json_data['levelname']
    output['json'] = json_data
  elif log_entry.textPayload is not None:
    output['message'] += six.text_type(log_entry.textPayload)
  elif log_entry.protoPayload is not None:
    output['json'] = encoding.MessageToDict(log_entry.protoPayload)
  return output


def _GetLabelAttributes(log_entry):
  """Reads the label attributes of the given log entry."""
  label_attributes = {'task_name': 'unknown_task'}

  labels = _ToDict(log_entry.labels)
  resource_labels = {} if not log_entry.resource else _ToDict(
      log_entry.resource.labels)
  if resource_labels.get('task_name') is not None:
    label_attributes['task_name'] = resource_labels['task_name']
  elif labels.get('task_name') is not None:
    label_attributes['task_name'] = labels['task_name']
  elif labels.get('ml.googleapis.com/task_name') is not None:
    label_attributes['task_name'] = labels['ml.googleapis.com/task_name']

  if labels.get('trial_id') is not None:
    label_attributes['trial_id'] = labels['trial_id']
  elif labels.get('ml.googleapis.com/trial_id') is not None:
    label_attributes['trial_id'] = labels['ml.googleapis.com/trial_id']

  return label_attributes


def _ToDict(message):
  if not message:
    return {}
  if isinstance(message, dict):
    return message
  else:
    return encoding.MessageToDict(message)