HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/current/lib/surface/managed_flink/jobs/create.py
# -*- coding: utf-8 -*- #
# Copyright 2024 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Create a Flink job from a Java jar."""

from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals

import os

from googlecloudsdk.api_lib.managed_flink import util as flink_util
from googlecloudsdk.api_lib.util import waiter
from googlecloudsdk.calliope import base
from googlecloudsdk.calliope import exceptions
from googlecloudsdk.command_lib.managed_flink import flags
from googlecloudsdk.command_lib.managed_flink import flink_backend
from googlecloudsdk.command_lib.util.args import common_args
from googlecloudsdk.core import exceptions as core_exceptions
from googlecloudsdk.core import log
from googlecloudsdk.core import properties
from googlecloudsdk.core import resources
from googlecloudsdk.core import yaml
from googlecloudsdk.core.util import encoding
from googlecloudsdk.core.util import files
from googlecloudsdk.core.util import platforms


def GetJobType(job_type, job_file):
  """Returns the job type based on the job_type and job_file."""
  if job_type == 'auto':
    job_type = None
    if job_file.endswith('.py'):
      job_type = 'python'
    elif job_file.endswith('.sql'):
      job_type = 'sql'
    elif job_file.endswith('.jar'):
      job_type = 'jar'
    if not job_type:
      raise UnknownJobType(
          'Unable to determine type of job [{}]. Job input files must end in'
      )
  return job_type


def GetInputType(job_file):
  """Returns the input type based on the job_file."""
  input_type = 'file://'
  # format is:
  # ar://<project>/<location>/<repository>/<file/path/version/file.jar>
  if job_file.startswith('ar://') or job_file.startswith('artifactregistry://'):
    input_type = 'ar://'
  return input_type


class UnknownJobType(core_exceptions.Error):
  """Raised when the job type cannot be determined."""


@base.DefaultUniverseOnly
@base.ReleaseTracks(base.ReleaseTrack.ALPHA)
class Create(base.BinaryBackedCommand):
  """Create a Flink job from a Java jar."""

  detailed_help = {
      'EXAMPLES': """
      To create a Flink job from a Java jar, run:

      $ {command} my-job.jar --project=my-project --location=us-central1
      """,
  }

  def _JobSubmitResponseHandler(self, response, job_type, temp_dir, args):
    """Process results of BinaryOperation Execution."""
    if response.stdout and (args.show_output or job_type == 'sql'):
      log.Print(response.stdout)

    if response.stderr:
      log.status.Print(response.stderr)

    if response.failed:
      return None

    jobgraph = os.path.join(temp_dir, 'jobgraph.bin')
    if not os.path.exists(jobgraph):
      return None
    jobspec = os.path.join(temp_dir, 'jobspec.yaml')
    if not os.path.exists(jobspec):
      return None
    with files.FileReader(jobspec) as f:
      jobspec_json = yaml.load(f)

    files_to_upload = list()
    files_to_upload.append(os.path.join(temp_dir, 'jobgraph.bin'))
    for jar in jobspec_json['job']['jars']:
      if jar.startswith('file:'):
        files_to_upload.append(jar[5:])

    dest = flink_backend.Upload(
        files_to_upload,
        os.path.join(args.staging_location, jobspec_json['job']['id']),
    )
    msg = flink_util.GetMessagesModule(self.ReleaseTrack())

    jobspec = msg.JobSpec(
        jobName='{0}'.format(jobspec_json['job']['name']),
        jobGraphUri=dest[os.path.join(temp_dir, 'jobgraph.bin')],
        jarUris=[dest[jar[5:]] for jar in jobspec_json['job']['jars']],
    )

    # Configure optional arguments
    if args.name:
      jobspec.displayName = args.name

    if args.network:
      config = msg.NetworkConfig(vpc=args.network)
      if args.subnetwork:
        config.subnetwork = args.subnetwork
      jobspec.networkConfig = config

    # Configure autotuning mode
    autotuning_config = msg.AutotuningConfig()
    if args.autotuning_mode == 'fixed':
      autotuning_config.fixed = msg.Fixed(parallelism=args.parallelism)
    else:
      autotuning_config.throughputBased = msg.Elastic(
          parallelism=args.min_parallelism,
          minParallelism=args.min_parallelism,
          maxParallelism=args.max_parallelism,
      )
    jobspec.autotuningConfig = autotuning_config

    job = msg.Job(name=jobspec_json['job']['id'], jobSpec=jobspec)
    if args.deployment:
      job.deploymentId = args.deployment

    create = msg.ManagedflinkProjectsLocationsJobsCreateRequest(
        parent='projects/{0}/locations/{1}'.format(
            properties.VALUES.core.project.Get(required=True), args.location
        ),
        jobId=jobspec_json['job']['id'],
        job=job,
    )
    if args.show_output:
      log.Print(create)
    if args.dry_run:
      return response
    flink_client = flink_util.FlinkClient(self.ReleaseTrack())
    create_op = flink_client.client.projects_locations_jobs.Create(create)
    if args.show_output:
      log.Print(create_op)
    log.Print('Create request issued for [{0}]'.format(create.jobId))
    if args.async_submit:
      return response

    create_op_ref = resources.REGISTRY.Parse(
        create_op.name, collection='managedflink.projects.locations.operations'
    )
    waiter.WaitFor(
        waiter.CloudOperationPoller(
            flink_client.client.projects_locations_jobs,
            flink_client.client.projects_locations_operations,
        ),
        create_op_ref,
        'Waiting for operations [{0}] to complete...'.format(create_op.name),
    )
    return response

  @staticmethod
  def Args(parser):
    # Common arguments
    common_args.ProjectArgument(
        help_text_to_overwrite='Project to run the job in.'
    ).AddToParser(parser)
    # Specific arguments
    flags.AddDeploymentArgument(
        parser, help_text_to_overwrite='Deployment to run the job in.'
    )
    flags.AddShowOutputArgument(parser)
    flags.AddDryRunArgument(parser)
    flags.AddAsyncArgument(parser)
    flags.AddMainClassArgument(parser)
    flags.AddExtraJarsArgument(parser)
    flags.AddLocationArgument(parser)
    flags.AddStagingLocationArgument(parser)
    flags.AddAutotuningModeArgument(parser)
    flags.AddJobJarArgument(parser)
    flags.AddJobTypeArgument(parser)
    flags.AddNameArgument(parser)
    flags.AddFixedParallelismArgs(parser)
    flags.AddElasticParallelismArgs(parser)
    flags.AddNetworkConfigArgs(parser)
    flags.AddWorkloadIdentityArgument(parser)
    flags.AddJobArgsCollector(parser)
    flags.AddPythonVirtualEnvArgument(parser)
    flags.AddExtraArchivesArgument(parser)

  def Run(self, args):
    current_os = platforms.OperatingSystem.Current()
    if current_os is platforms.OperatingSystem.WINDOWS:
      raise exceptions.ToolException('Job creation not supported on Windows.')
    # Determing the job_file input method
    input_type = GetInputType(args.job)
    # Make sure the job file exists
    if input_type == 'file://' and not os.path.exists(args.job):
      raise exceptions.InvalidArgumentException(
          'JAR|PY|SQL',
          'Job definition [{0}] does not exist.'.format(args.job),
      )

    # Determine the job type
    job_type = GetJobType(args.job_type, args.job)

    # Make sure both network arguments are set if at least one is present.
    if args.network:
      if not args.subnetwork:
        raise exceptions.InvalidArgumentException(
            'network-config-subnetwork',
            '--network-config-subnetwork must be set if --network-config-vpc is'
            ' set.',
        )
    elif args.subnetwork:
      if not args.network:
        raise exceptions.InvalidArgumentException(
            'network-config-vpc',
            '--network-config-vpc must be set if --network-config-subnetwork is'
            ' set.',
        )

    if args.workload_identity and args.deployment:
      raise exceptions.InvalidArgumentException(
          'workload-identity',
          '--workload-identity cannot be set if --deployment is set.',
      )

    # Validate that autotuning arguments are consistent
    flink_backend.ValidateAutotuning(
        args.autotuning_mode,
        args.min_parallelism,
        args.max_parallelism,
        args.parallelism,
    )

    # Validate the staging location
    if not args.staging_location.startswith('gs://'):
      raise exceptions.InvalidArgumentException(
          'staging-location',
          'Staging location must be of the form gs://<bucket>/<path>.',
      )
    flink_backend.CheckStagingLocation(args.staging_location)

    # Validate the python virtualenv
    if job_type == 'python':
      if not args.python_venv:
        raise exceptions.InvalidArgumentException(
            'python-venv',
            'Python virtualenv must be set if job type is python.',
        )

      if not args.python_venv.startswith('gs://'):
        raise exceptions.InvalidArgumentException(
            'python-venv',
            'Python Virtualenv location must be of the form'
            ' gs://<bucket>/<path>.',
        )

    env = dict()
    env['CLOUDSDK_MANAGEDFLINK_JOB_TYPE'] = job_type
    if job_type == 'python':
      if args.extra_jars:
        env['HADOOP_CLASSPATH'] = ':'.join(args.extra_jars)
    elif job_type == 'jar':
      if args.extra_jars:
        env['HADOOP_CLASSPATH'] = ':'.join(args.extra_jars)

    # If there is a HADOOP_CLASSPATH environment variable, add it to the env
    # variable so it doesn't get overwritten.
    if env.get('HADOOP_CLASSPATH') and encoding.GetEncodedValue(
        os.environ, 'HADOOP_CLASSPATH'
    ):
      env['HADOOP_CLASSPATH'] = ':'.join([
          env.get('HADOOP_CLASSPATH'),
          encoding.GetEncodedValue(os.environ, 'HADOOP_CLASSPATH'),
      ])

    # Dry run
    if args.dry_run:
      env['CLOUDSDK_MANAGEDFLINK_DRY_RUN'] = 'true'
      env['CLOUDSDK_MANAGEDFLINK_ECHO_CMD'] = 'true'

    with files.TemporaryDirectory() as temp_dir:
      jar_path = args.job
      if input_type == 'ar://':
        jar_name, registry = flink_backend.CreateRegistryFromArtifactUri(
            args.job
        )
        log.Print(
            'Downloading {0} file from Artifact Registry...'.format(jar_name)
        )
        jar_path = os.path.join(temp_dir, jar_name.split('/')[-1])
        # Will throw an HTTPError if the file does not exist.
        flink_backend.DownloadJarFromArtifactRegistry(
            dest_path=jar_path, artifact_jar_path=registry.RelativeName()
        )
        log.debug('Successfully downloaded the file to ' + jar_path)
      command_executor = flink_backend.FlinkClientWrapper()
      response = command_executor(
          command='run',
          job_type=job_type,
          jar=jar_path,
          target='gcloud',
          deployment=args.deployment,
          staging_location=args.staging_location,
          autotuning_mode=args.autotuning_mode,
          temp_dir=temp_dir,
          network=args.network,
          subnetwork=args.subnetwork,
          name=args.name,
          location=args.location,
          main_class=args.main_class,
          extra_jars=args.extra_jars,
          extra_args=args.job_args,
          extra_archives=args.archives,
          python_venv=args.python_venv,
          env=flink_backend.GetEnvArgsForCommand(env),
      )
      return self._JobSubmitResponseHandler(response, job_type, temp_dir, args)