HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/dataproc/jobs/base.py
# -*- coding: utf-8 -*- #
# Copyright 2015 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for building the dataproc clusters CLI."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import abc
import collections
import os

from apitools.base.py import encoding
from googlecloudsdk.api_lib.dataproc import constants
from googlecloudsdk.api_lib.dataproc import exceptions
from googlecloudsdk.api_lib.dataproc import storage_helpers
from googlecloudsdk.core import log
from googlecloudsdk.core.util import files
import six
import six.moves.urllib.parse


class JobBase(six.with_metaclass(abc.ABCMeta, object)):
  """Base class for Jobs."""

  def __init__(self, *args, **kwargs):
    super(JobBase, self).__init__(*args, **kwargs)
    self.files_by_type = {}
    self.files_to_stage = []
    self._staging_dir = None

  def _GetStagedFile(self, file_str):
    """Validate file URI and register it for uploading if it is local."""
    drive, _ = os.path.splitdrive(file_str)
    uri = six.moves.urllib.parse.urlsplit(file_str, allow_fragments=False)
    # Determine the file is local to this machine if no scheme besides a drive
    # is passed. file:// URIs are interpreted as living on VMs.
    is_local = drive or not uri.scheme
    if not is_local:
      # Non-local files are already staged.
      # TODO(b/36057257): Validate scheme.
      return file_str

    if not os.path.exists(file_str):
      raise files.Error('File Not Found: [{0}].'.format(file_str))
    if self._staging_dir is None:
      # we raise this exception only if there are files to stage but the staging
      # location couldn't be determined. In case where files are already staged
      # this exception is not raised
      raise exceptions.ArgumentError(
          'Could not determine where to stage local file {0}. When submitting '
          'a job to a cluster selected via --cluster-labels, either\n'
          '- a staging bucket must be provided via the --bucket argument, or\n'
          '- all provided files must be non-local.'.format(file_str))

    basename = os.path.basename(file_str)
    self.files_to_stage.append(file_str)
    staged_file = six.moves.urllib.parse.urljoin(self._staging_dir, basename)
    return staged_file

  def ValidateAndStageFiles(self):
    """Validate file URIs and upload them if they are local."""
    for file_type, file_or_files in six.iteritems(self.files_by_type):
      # TODO(b/36049793): Validate file suffixes.
      if not file_or_files:
        continue
      elif isinstance(file_or_files, six.string_types):
        self.files_by_type[file_type] = self._GetStagedFile(file_or_files)
      else:
        staged_files = [self._GetStagedFile(f) for f in file_or_files]
        self.files_by_type[file_type] = staged_files

    if self.files_to_stage:
      log.info('Staging local files {0} to {1}.'.format(self.files_to_stage,
                                                        self._staging_dir))
      storage_helpers.Upload(self.files_to_stage, self._staging_dir)

  def GetStagingDir(self, cluster, cluster_pool, job_id, bucket=None):
    """Determine the GCS directory to stage job resources in."""
    if bucket is None and cluster is None:
      return None
    if bucket is None:
      # If bucket is not provided, fall back to cluster's staging bucket.
      if cluster.config:
        bucket = cluster.config.configBucket
      elif cluster.virtualClusterConfig:
        bucket = cluster.virtualClusterConfig.stagingBucket
      else:
        # This is only needed if the request needs to stage files. If it doesn't
        # everything will work. If it does need to stage files, then it will
        # fail with a message saying --bucket should be specified.
        return None
    environment = 'unresolved'
    if cluster is not None:
      environment = cluster.clusterUuid
    if cluster_pool is not None:
      environment = cluster_pool
    staging_dir = (
        'gs://{bucket}/{prefix}/{environment}/jobs/{job_id}/staging/'.format(
            bucket=bucket,
            prefix=constants.GCS_METADATA_PREFIX,
            environment=environment,
            job_id=job_id))
    return staging_dir

  def BuildLoggingConfig(self, messages, driver_logging):
    """Build LoggingConfig from parameters."""
    if not driver_logging:
      return None

    value_enum = (messages.LoggingConfig.DriverLogLevelsValue.
                  AdditionalProperty.ValueValueValuesEnum)
    config = collections.OrderedDict(
        [(key, value_enum(value)) for key, value in driver_logging.items()])
    return messages.LoggingConfig(
        driverLogLevels=encoding.DictToAdditionalPropertyMessage(
            config,
            messages.LoggingConfig.DriverLogLevelsValue))

  def PopulateFilesByType(self, args):
    self.files_by_type.update(self.GetFilesByType(args))