HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/394/lib/googlecloudsdk/command_lib/ml_engine/local_train.py
# -*- coding: utf-8 -*- #
# Copyright 2016 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for running training jobs locally."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import atexit
import json
import os
import subprocess

from googlecloudsdk.core import execution_utils
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core.util import encoding
from googlecloudsdk.core.util import files
from six.moves import range


def GetPrimaryNodeName():
  """Get the primary node name.

  Returns:
    str, the name of the primary node. If running in tensorflow 1.x,
    return 'master'. If running in tensorflow 2.x, return 'chief'.
    If tensorflow is not installed in local envrionment, it will return
    the default name 'chief'.
  Raises:
    ValueError: if there is no python executable on the user system thrown by
      execution_utils.GetPythonExecutable.
  """
  exe_override = properties.VALUES.ml_engine.local_python.Get()
  python_executable = (
      exe_override or files.FindExecutableOnPath('python') or
      execution_utils.GetPythonExecutable())
  cmd = [python_executable,
         '-c',
         'import tensorflow as tf; print(tf.version.VERSION)']
  with files.FileWriter(os.devnull) as f:
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=f)
  return_code = proc.wait()
  if return_code != 0:
    log.warning('''
    Cannot import tensorflow under path {}. Using "chief" for cluster setting.
    If this is not intended, Please check if tensorflow is installed. Please also
    verify if the python path used is correct. If not, to change the python path:
    use `gcloud config set ml_engine/local_python $python_path`
    Eg: gcloud config set ml_engine/local_python /usr/bin/python3'''.format(
        python_executable))
    return 'chief'

  tf_version = proc.stdout.read()
  if 'decode' in dir(tf_version):
    tf_version = tf_version.decode('utf-8')
  if tf_version.startswith('1.'):
    return 'master'
  elif tf_version.startswith('2.'):
    return 'chief'
  log.warning(
      'Unexpected tensorflow version {}, using the default primary'
      ' node name, aka "chief" for cluster settings'.format(tf_version))
  return 'chief'


def MakeProcess(module_name,
                package_root,
                args=None,
                cluster=None,
                task_type=None,
                index=None,
                **extra_popen_args):
  """Make a Popen object that runs the module, with the correct env.

  If task_type is primary instead replaces the current process with the
  subprocess via execution_utils.Exec
  Args:
    module_name: str. Name of the module to run, e.g. trainer.task
    package_root: str. Absolute path to the package root for the module.
      used as CWD for the subprocess.
    args: [str]. Additional user args. Any relative paths will not work.
    cluster: dict. Cluster configuration dictionary. Suitable for passing to
      tf.train.ClusterSpec.
    task_type: str. Task type of this process. Only relevant if cluster is
      specified.
    index: int. Task index of this process.
    **extra_popen_args: extra args passed to Popen. Used for testing.
  Returns:
    a subprocess.Popen object corresponding to the subprocesses or an int
    corresponding to the return value of the subprocess
    (if task_type is primary)
  Raises:
    ValueError: if there is no python executable on the user system thrown by
      execution_utils.GetPythonExecutable.
  """
  if args is None:
    args = []
  exe_override = properties.VALUES.ml_engine.local_python.Get()
  python_executable = (
      exe_override or files.FindExecutableOnPath('python') or
      execution_utils.GetPythonExecutable())
  cmd = [python_executable, '-m', module_name] + args
  config = {
      'job': {'job_name': module_name, 'args': args},
      'task': {'type': task_type, 'index': index} if cluster else {},
      'cluster': cluster or {},
      'environment': 'cloud'
  }
  log.info(('launching training process:\n'
            'command: {cmd}\n config: {config}').format(
                cmd=' '.join(cmd),
                config=json.dumps(config, indent=2, sort_keys=True)))

  env = os.environ.copy()
  # the tf_config environment variable is used to pass the tensorflow
  # configuration options to the training module. the module specific
  # arguments are passed as command line arguments.
  env['TF_CONFIG'] = json.dumps(config)
  if task_type == GetPrimaryNodeName():
    return execution_utils.Exec(
        cmd, env=env, no_exit=True, cwd=package_root, **extra_popen_args)
  else:
    env = encoding.EncodeEnv(env)
    task = subprocess.Popen(
        cmd,
        env=env,
        cwd=package_root,
        **extra_popen_args
    )
    atexit.register(execution_utils.KillSubprocess, task)
    return task


def RunDistributed(module_name,
                   package_root,
                   num_ps,
                   num_workers,
                   num_evaluators,
                   start_port,
                   user_args=None):
  """Create a cluster configuration and start processes for the cluster.

  Args:
    module_name: str. Python module to use as the task.
    package_root: str. Absolute path to the package root of the module.
    num_ps: int. Number of parameter servers
    num_workers: int. Number of workers.
    num_evaluators: int. Number of evaluators.
    start_port: int. First port for the contiguous block of ports used
      by the cluster.
    user_args: [str]. Additional user args for the task. Any relative paths will
      not work.
  Returns:
    int. the retval of primary subprocess
  """
  ports = list(range(start_port, start_port + num_ps + num_workers + 1))
  cluster = {
      GetPrimaryNodeName(): ['localhost:{port}'.format(port=ports[0])],
      'ps': ['localhost:{port}'.format(port=p)
             for p in ports[1:num_ps + 1]],
      'worker': ['localhost:{port}'.format(port=p)
                 for p in ports[num_ps + 1:]]
  }
  for task_type, addresses in cluster.items():
    if task_type != GetPrimaryNodeName():
      for i in range(len(addresses)):
        MakeProcess(module_name,
                    package_root,
                    args=user_args,
                    task_type=task_type,
                    index=i,
                    cluster=cluster)
  for i in range(num_evaluators):
    MakeProcess(module_name,
                package_root,
                args=user_args,
                task_type='evaluator',
                index=i,
                cluster=cluster)
  return MakeProcess(module_name,
                     package_root,
                     args=user_args,
                     task_type=GetPrimaryNodeName(),
                     index=0,
                     cluster=cluster)