File: //snap/google-cloud-cli/396/lib/googlecloudsdk/command_lib/dataproc/jobs/base.py
# -*- coding: utf-8 -*- #
# Copyright 2015 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for building the dataproc clusters CLI."""
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import abc
import collections
import os
from apitools.base.py import encoding
from googlecloudsdk.api_lib.dataproc import constants
from googlecloudsdk.api_lib.dataproc import exceptions
from googlecloudsdk.api_lib.dataproc import storage_helpers
from googlecloudsdk.core import log
from googlecloudsdk.core.util import files
import six
import six.moves.urllib.parse
class JobBase(six.with_metaclass(abc.ABCMeta, object)):
  """Base class for Jobs."""
  def __init__(self, *args, **kwargs):
    super(JobBase, self).__init__(*args, **kwargs)
    self.files_by_type = {}
    self.files_to_stage = []
    self._staging_dir = None
  def _GetStagedFile(self, file_str):
    """Validate file URI and register it for uploading if it is local."""
    drive, _ = os.path.splitdrive(file_str)
    uri = six.moves.urllib.parse.urlsplit(file_str, allow_fragments=False)
    # Determine the file is local to this machine if no scheme besides a drive
    # is passed. file:// URIs are interpreted as living on VMs.
    is_local = drive or not uri.scheme
    if not is_local:
      # Non-local files are already staged.
      # TODO(b/36057257): Validate scheme.
      return file_str
    if not os.path.exists(file_str):
      raise files.Error('File Not Found: [{0}].'.format(file_str))
    if self._staging_dir is None:
      # we raise this exception only if there are files to stage but the staging
      # location couldn't be determined. In case where files are already staged
      # this exception is not raised
      raise exceptions.ArgumentError(
          'Could not determine where to stage local file {0}. When submitting '
          'a job to a cluster selected via --cluster-labels, either\n'
          '- a staging bucket must be provided via the --bucket argument, or\n'
          '- all provided files must be non-local.'.format(file_str))
    basename = os.path.basename(file_str)
    self.files_to_stage.append(file_str)
    staged_file = six.moves.urllib.parse.urljoin(self._staging_dir, basename)
    return staged_file
  def ValidateAndStageFiles(self):
    """Validate file URIs and upload them if they are local."""
    for file_type, file_or_files in six.iteritems(self.files_by_type):
      # TODO(b/36049793): Validate file suffixes.
      if not file_or_files:
        continue
      elif isinstance(file_or_files, six.string_types):
        self.files_by_type[file_type] = self._GetStagedFile(file_or_files)
      else:
        staged_files = [self._GetStagedFile(f) for f in file_or_files]
        self.files_by_type[file_type] = staged_files
    if self.files_to_stage:
      log.info('Staging local files {0} to {1}.'.format(self.files_to_stage,
                                                        self._staging_dir))
      storage_helpers.Upload(self.files_to_stage, self._staging_dir)
  def GetStagingDir(self, cluster, cluster_pool, job_id, bucket=None):
    """Determine the GCS directory to stage job resources in."""
    if bucket is None and cluster is None:
      return None
    if bucket is None:
      # If bucket is not provided, fall back to cluster's staging bucket.
      if cluster.config:
        bucket = cluster.config.configBucket
      elif cluster.virtualClusterConfig:
        bucket = cluster.virtualClusterConfig.stagingBucket
      else:
        # This is only needed if the request needs to stage files. If it doesn't
        # everything will work. If it does need to stage files, then it will
        # fail with a message saying --bucket should be specified.
        return None
    environment = 'unresolved'
    if cluster is not None:
      environment = cluster.clusterUuid
    if cluster_pool is not None:
      environment = cluster_pool
    staging_dir = (
        'gs://{bucket}/{prefix}/{environment}/jobs/{job_id}/staging/'.format(
            bucket=bucket,
            prefix=constants.GCS_METADATA_PREFIX,
            environment=environment,
            job_id=job_id))
    return staging_dir
  def BuildLoggingConfig(self, messages, driver_logging):
    """Build LoggingConfig from parameters."""
    if not driver_logging:
      return None
    value_enum = (messages.LoggingConfig.DriverLogLevelsValue.
                  AdditionalProperty.ValueValueValuesEnum)
    config = collections.OrderedDict(
        [(key, value_enum(value)) for key, value in driver_logging.items()])
    return messages.LoggingConfig(
        driverLogLevels=encoding.DictToAdditionalPropertyMessage(
            config,
            messages.LoggingConfig.DriverLogLevelsValue))
  def PopulateFilesByType(self, args):
    self.files_by_type.update(self.GetFilesByType(args))