HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/googlecloudsdk/command_lib/dataflow/dataflow_util.py
# -*- coding: utf-8 -*- #
# Copyright 2015 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for building the dataflow CLI."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import json
import re

from apitools.base.py import exceptions
from apitools.base.py import list_pager

from googlecloudsdk.api_lib.dataflow import apis
from googlecloudsdk.api_lib.dataflow import exceptions as dataflow_exceptions

from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources

# Regular expression to match only metrics from Dataflow. Currently, this should
# match at least "dataflow" and "dataflow/v1b3". User metrics have an origin set
# as /^user/.
DATAFLOW_METRICS_RE = re.compile('^dataflow')

DATAFLOW_API_DEFAULT_REGION = apis.DATAFLOW_API_DEFAULT_REGION

# Regular expression to only match watermark metrics.
WINDMILL_WATERMARK_RE = re.compile('^(.*)-windmill-(.*)-watermark')

JOBS_COLLECTION = 'dataflow.projects.locations.jobs'

DEFAULT_REGION_MESSAGE = 'Defaults to \'{0}\'.'.format(
    DATAFLOW_API_DEFAULT_REGION)


def GetErrorMessage(error):
  """Extract the error message from an HTTPError.

  Args:
    error: The error exceptions.HttpError thrown by the API client.

  Returns:
    A string describing the error.
  """
  try:
    content_obj = json.loads(error.content)
    return content_obj.get('error', {}).get('message', '')
  except ValueError:
    log.err.Print(error.response)
    return 'Unknown error'


def MakeErrorMessage(error, job_id='', project_id='', region_id=''):
  """Create a standard error message across commands.

  Args:
    error: The error exceptions.HttpError thrown by the API client.
    job_id: The job ID that was used in the command.
    project_id: The project ID that was used in the command.
    region_id: The region ID that was used in the command.

  Returns:
    str, a standard error message.
  """
  if job_id:
    job_id = ' with job ID [{0}]'.format(job_id)
  if project_id:
    project_id = ' in project [{0}]'.format(project_id)
  if region_id:
    region_id = ' in regional endpoint [{0}]'.format(region_id)
  return 'Failed operation{0}{1}{2}: {3}'.format(job_id, project_id, region_id,
                                                 GetErrorMessage(error))


def YieldExceptionWrapper(generator, job_id='', project_id='', region_id=''):
  """Wraps a generator to catch any exceptions.

  Args:
    generator: The error exceptions.HttpError thrown by the API client.
    job_id: The job ID that was used in the command.
    project_id: The project ID that was used in the command.
    region_id: The region ID that was used in the command.

  Yields:
    The generated object.

  Raises:
    dataflow_exceptions.ServiceException: An exception for errors raised by
      the service.
  """
  try:
    for x in generator:
      yield x
  except exceptions.HttpError as e:
    raise dataflow_exceptions.ServiceException(
        MakeErrorMessage(e, job_id, project_id, region_id))


def YieldFromList(service,
                  request,
                  limit=None,
                  batch_size=100,
                  field='items',
                  batch_size_attribute='maxResults',
                  predicate=None,
                  job_id='',
                  project_id='',
                  region_id=''):
  """Returns a wrapped list_page.YieldFromList to catch any exceptions.

  Args:
    service: apitools_base.BaseApiService, A service with a .List() method.
    request: protorpc.messages.Message, The request message corresponding to the
      service's .List() method, with all the attributes populated except the
      .maxResults and .pageToken attributes.
    limit: int, The maximum number of records to yield. None if all available
      records should be yielded.
    batch_size: int, The number of items to retrieve per request.
    field: str, The field in the response that will be a list of items.
    batch_size_attribute: str, The name of the attribute in a response message
      holding the maximum number of results to be returned. None if
      caller-specified batch size is unsupported.
    predicate: lambda, A function that returns true for items to be yielded.
    job_id: The job ID that was used in the command.
    project_id: The project ID that was used in the command.
    region_id: The region ID that was used in the command.

  Returns:
    The wrapped generator.

  Raises:
    dataflow_exceptions.ServiceException: if list request failed.
  """
  method = 'List'
  if not region_id:
    method = 'Aggregated'

  pager = list_pager.YieldFromList(
      service=service,
      request=request,
      limit=limit,
      batch_size=batch_size,
      field=field,
      batch_size_attribute=batch_size_attribute,
      predicate=predicate,
      method=method)
  return YieldExceptionWrapper(pager, job_id, project_id, region_id)


def JobsUriFunc(resource):
  """Transform a job resource into a URL string.

  Args:
    resource: The DisplayInfo job object

  Returns:
    URL to the job
  """

  ref = resources.REGISTRY.Parse(
      resource.id,
      params={
          'projectId': properties.VALUES.core.project.GetOrFail,
          'location': resource.location
      },
      collection=JOBS_COLLECTION)
  return ref.SelfLink()


def JobsUriFromId(job_id, region_id):
  """Transform a job ID into a URL string.

  Args:
    job_id: The job ID
    region_id: Region ID of the job's regional endpoint.

  Returns:
    URL to the job
  """
  ref = resources.REGISTRY.Parse(
      job_id,
      params={
          'projectId': properties.VALUES.core.project.GetOrFail,
          'location': region_id
      },
      collection=JOBS_COLLECTION)
  return ref.SelfLink()


# TODO(b/139889563): Remove this method when args region is changed to required
def GetRegion(args):
  """Get region to be used in Dataflow services.

  Args:
    args: Argument passed in when running gcloud dataflow command

  Returns:
    Region specified by user from --region flag in args, then fall back to
    'us-central1'.
  """
  region = args.region

  if not region:
    region = DATAFLOW_API_DEFAULT_REGION
    msg = ('`--region` not set; defaulting to \'{0}\'. In an upcoming ' +
           'release, users must specify a region explicitly. See https://' +
           'cloud.google.com/dataflow/docs/concepts/regional-endpoints ' +
           'for additional details.'
          ).format(DATAFLOW_API_DEFAULT_REGION)
    log.warning(msg)

  return region