HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/lib/third_party/ml_sdk/cloud/ml/util/_file.py
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Some file reading utilities.
"""

import glob
import logging
import os
import shutil
import subprocess
import time

from apache_beam.io import gcsio


# TODO(user): Remove use of gsutil


def create_directory(path):
  """Creates a local directory.

  Does nothing if a Google Cloud Storage path is passed.

  Args:
    path: the path to create.

  Raises:
    ValueError: if path is a file or os.makedir fails.
  """
  if path.startswith('gs://'):
    return
  if os.path.isdir(path):
    return
  if os.path.isfile(path):
    raise ValueError('Unable to create location. "%s" exists and is a file.' %
                     path)

  try:
    os.makedirs(path)
  except:  # pylint: disable=broad-except
    raise ValueError('Unable to create location. "%s"' % path)


def open_local_or_gcs(path, mode):
  """Opens the given path."""
  if path.startswith('gs://'):
    try:
      return gcsio.GcsIO().open(path, mode)
    except Exception as e:  # pylint: disable=broad-except
      # Currently we retry exactly once, to work around flaky gcs calls.
      logging.error('Retrying after exception reading gcs file: %s', e)
      time.sleep(10)
      return gcsio.GcsIO().open(path, mode)
  else:
    return open(path, mode)


def file_exists(path):
  """Returns whether the file exists."""
  if path.startswith('gs://'):
    return gcsio.GcsIO().exists(path)
  else:
    return os.path.exists(path)


def copy_file(src, dest):
  """Copy a file from src to dest.

  Supports local and Google Cloud Storage.

  Args:
    src: source path.
    dest: destination path.
  """
  with open_local_or_gcs(src, 'r') as h_src:
    with open_local_or_gcs(dest, 'w') as h_dest:
      shutil.copyfileobj(h_src, h_dest)


def write_file(path, data):
  """Writes data to a file.

  Supports local and Google Cloud Storage.

  Args:
    path: output file path.
    data: data to write to file.
  """
  with open_local_or_gcs(path, 'w') as h_dest:
    h_dest.write(data)  # pylint: disable=no-member


def load_file(path):
  if path.startswith('gs://'):
    content = _read_cloud_file(path)
  else:
    content = _read_local_file(path)

  if content is None:
    raise ValueError('File cannot be loaded from %s' % path)

  return content


def glob_files(path):
  if path.startswith('gs://'):
    return gcsio.GcsIO().glob(path)
  else:
    return glob.glob(path)


def _read_local_file(local_path):
  with open(local_path, 'r') as f:
    return f.read()


def _read_cloud_file(storage_path):
  with open_local_or_gcs(storage_path, 'r') as src:
    return src.read()


def read_file_stream(file_list):
  for path in file_list if not isinstance(file_list, basestring) else [
      file_list
  ]:
    if path.startswith('gs://'):
      for line in _read_cloud_file_stream(path):
        yield line
    else:
      for line in _read_local_file_stream(path):
        yield line


def _read_local_file_stream(path):
  with open(path) as file_in:
    for line in file_in:
      yield line


def _read_cloud_file_stream(path):
  read_file = subprocess.Popen(
      ['gsutil', 'cp', path, '-'],
      stdout=subprocess.PIPE,
      stderr=subprocess.PIPE)
  with read_file.stdout as file_in:
    for line in file_in:
      yield line
  if read_file.wait() != 0:
    raise IOError('Unable to read data from Google Cloud Storage: "%s"' % path)